浏览代码

HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen.

Erik Krogen 7 年之前
父节点
当前提交
ac3eeeeb3c

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java

@@ -22,6 +22,7 @@ import java.net.URL;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -107,6 +108,12 @@ interface AsyncLogger {
    * Begin a new epoch on the target node.
    * Begin a new epoch on the target node.
    */
    */
   public ListenableFuture<NewEpochResponseProto> newEpoch(long epoch);
   public ListenableFuture<NewEpochResponseProto> newEpoch(long epoch);
+
+  /**
+   * Fetch journaled edits from the cache.
+   */
+  public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
+      long fromTxnId, int maxTransactions);
   
   
   /**
   /**
    * Fetch the list of edit logs available on the remote node.
    * Fetch the list of edit logs available on the remote node.

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java

@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -261,6 +262,19 @@ class AsyncLoggerSet {
     return QuorumCall.create(calls);
     return QuorumCall.create(calls);
   }
   }
 
 
+  public QuorumCall<AsyncLogger, GetJournaledEditsResponseProto>
+  getJournaledEdits(long fromTxnId, int maxTransactions) {
+    Map<AsyncLogger,
+        ListenableFuture<GetJournaledEditsResponseProto>> calls
+        = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<GetJournaledEditsResponseProto> future =
+          logger.getJournaledEdits(fromTxnId, maxTransactions);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
   public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
   public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
       long fromTxnId, boolean inProgressOk) {
       long fromTxnId, boolean inProgressOk) {
     Map<AsyncLogger,
     Map<AsyncLogger,

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -559,6 +560,19 @@ public class IPCLoggerChannel implements AsyncLogger {
     });
     });
   }
   }
 
 
+  @Override
+  public ListenableFuture<GetJournaledEditsResponseProto> getJournaledEdits(
+      long fromTxnId, int maxTransactions) {
+    return parallelExecutor.submit(
+        new Callable<GetJournaledEditsResponseProto>() {
+          @Override
+          public GetJournaledEditsResponseProto call() throws IOException {
+            return getProxy().getJournaledEdits(journalId, nameServiceId,
+                fromTxnId, maxTransactions);
+          }
+        });
+  }
+
   @Override
   @Override
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       final long fromTxnId, final boolean inProgressOk) {
       final long fromTxnId, final boolean inProgressOk) {

+ 106 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URI;
 import java.net.URL;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
@@ -36,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -67,6 +69,14 @@ import com.google.protobuf.TextFormat;
 public class QuorumJournalManager implements JournalManager {
 public class QuorumJournalManager implements JournalManager {
   static final Logger LOG = LoggerFactory.getLogger(QuorumJournalManager.class);
   static final Logger LOG = LoggerFactory.getLogger(QuorumJournalManager.class);
 
 
+  // This config is not publicly exposed
+  static final String QJM_RPC_MAX_TXNS_KEY =
+      "dfs.ha.tail-edits.qjm.rpc.max-txns";
+  static final int QJM_RPC_MAX_TXNS_DEFAULT = 5000;
+
+  // Maximum number of transactions to fetch at a time when using the
+  // RPC edit fetch mechanism
+  private final int maxTxnsPerRpc;
   // Timeouts for which the QJM will wait for each of the following actions.
   // Timeouts for which the QJM will wait for each of the following actions.
   private final int startSegmentTimeoutMs;
   private final int startSegmentTimeoutMs;
   private final int prepareRecoveryTimeoutMs;
   private final int prepareRecoveryTimeoutMs;
@@ -125,6 +135,10 @@ public class QuorumJournalManager implements JournalManager {
     this.nameServiceId = nameServiceId;
     this.nameServiceId = nameServiceId;
     this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
     this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
 
 
+    this.maxTxnsPerRpc =
+        conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
+    Preconditions.checkArgument(maxTxnsPerRpc > 0,
+        "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
     // Configure timeouts.
     // Configure timeouts.
     this.startSegmentTimeoutMs = conf.getInt(
     this.startSegmentTimeoutMs = conf.getInt(
         DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
         DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
@@ -479,17 +493,104 @@ public class QuorumJournalManager implements JournalManager {
   public void selectInputStreams(Collection<EditLogInputStream> streams,
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxnId, boolean inProgressOk,
       long fromTxnId, boolean inProgressOk,
       boolean onlyDurableTxns) throws IOException {
       boolean onlyDurableTxns) throws IOException {
+    if (inProgressOk) {
+      LOG.info("Tailing edits starting from txn ID " + fromTxnId +
+          " via RPC mechanism");
+      try {
+        Collection<EditLogInputStream> rpcStreams = new ArrayList<>();
+        selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
+        streams.addAll(rpcStreams);
+        return;
+      } catch (IOException ioe) {
+        LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
+            " via RPC; falling back to streaming.", ioe);
+      }
+    }
+    selectStreamingInputStreams(streams, fromTxnId, inProgressOk,
+        onlyDurableTxns);
+  }
+
+  /**
+   * Select input streams from the journals, specifically using the RPC
+   * mechanism optimized for low latency.
+   *
+   * @param streams The collection to store the return streams into.
+   * @param fromTxnId Select edits starting from this transaction ID
+   * @param onlyDurableTxns Iff true, only include transactions which have been
+   *                        committed to a quorum of the journals.
+   * @throws IOException Upon issues, including cache misses on the journals.
+   */
+  private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean onlyDurableTxns) throws IOException {
+    QuorumCall<AsyncLogger, GetJournaledEditsResponseProto> q =
+        loggers.getJournaledEdits(fromTxnId, maxTxnsPerRpc);
+    Map<AsyncLogger, GetJournaledEditsResponseProto> responseMap =
+        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
+            "selectRpcInputStreams");
+    assert responseMap.size() >= loggers.getMajoritySize() :
+        "Quorum call returned without a majority";
 
 
+    List<Integer> responseCounts = new ArrayList<>();
+    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
+      responseCounts.add(resp.getTxnCount());
+    }
+    Collections.sort(responseCounts);
+    int highestTxnCount = responseCounts.get(responseCounts.size() - 1);
+    if (LOG.isDebugEnabled() || highestTxnCount < 0) {
+      StringBuilder msg = new StringBuilder("Requested edits starting from ");
+      msg.append(fromTxnId).append("; got ").append(responseMap.size())
+          .append(" responses: <");
+      for (Map.Entry<AsyncLogger, GetJournaledEditsResponseProto> ent :
+          responseMap.entrySet()) {
+        msg.append("[").append(ent.getKey()).append(", ")
+            .append(ent.getValue().getTxnCount()).append("],");
+      }
+      msg.append(">");
+      if (highestTxnCount < 0) {
+        throw new IOException("Did not get any valid JournaledEdits " +
+            "responses: " + msg);
+      } else {
+        LOG.debug(msg.toString());
+      }
+    }
+
+    int maxAllowedTxns = !onlyDurableTxns ? highestTxnCount :
+        responseCounts.get(responseCounts.size() - loggers.getMajoritySize());
+    if (maxAllowedTxns == 0) {
+      LOG.debug("No new edits available in logs; requested starting from " +
+          "ID " + fromTxnId);
+      return;
+    }
+    LOG.info("Selected loggers with >= " + maxAllowedTxns +
+        " transactions starting from " + fromTxnId);
+    PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
+        JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    for (GetJournaledEditsResponseProto resp : responseMap.values()) {
+      long endTxnId = fromTxnId - 1 +
+          Math.min(maxAllowedTxns, resp.getTxnCount());
+      allStreams.add(EditLogFileInputStream.fromByteString(
+          resp.getEditLog(), fromTxnId, endTxnId, true));
+    }
+    JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
+  }
+
+  /**
+   * Select input streams from the journals, specifically using the streaming
+   * mechanism optimized for resiliency / bulk load.
+   */
+  private void selectStreamingInputStreams(
+      Collection<EditLogInputStream> streams, long fromTxnId,
+      boolean inProgressOk, boolean onlyDurableTxns) throws IOException {
     QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
     QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
         loggers.getEditLogManifest(fromTxnId, inProgressOk);
         loggers.getEditLogManifest(fromTxnId, inProgressOk);
     Map<AsyncLogger, RemoteEditLogManifest> resps =
     Map<AsyncLogger, RemoteEditLogManifest> resps =
         loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
         loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
-            "selectInputStreams");
-    
-    LOG.debug("selectInputStream manifests:\n" +
+            "selectStreamingInputStreams");
+
+    LOG.debug("selectStreamingInputStream manifests:\n" +
         Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
         Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
-    
-    final PriorityQueue<EditLogInputStream> allStreams = 
+
+    final PriorityQueue<EditLogInputStream> allStreams =
         new PriorityQueue<EditLogInputStream>(64,
         new PriorityQueue<EditLogInputStream>(64,
             JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
             JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
     for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import com.google.protobuf.ByteString;
 import java.io.BufferedInputStream;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.EOFException;
@@ -119,6 +120,23 @@ public class EditLogFileInputStream extends EditLogInputStream {
     return new EditLogFileInputStream(new URLLog(connectionFactory, url),
     return new EditLogFileInputStream(new URLLog(connectionFactory, url),
         startTxId, endTxId, inProgress);
         startTxId, endTxId, inProgress);
   }
   }
+
+  /**
+   * Create an EditLogInputStream from a {@link ByteString}, i.e. an in-memory
+   * collection of bytes.
+   *
+   * @param bytes The byte string to read from
+   * @param startTxId the expected starting transaction ID
+   * @param endTxId the expected ending transaction ID
+   * @param inProgress whether the log is in-progress
+   * @return An edit stream to read from
+   */
+  public static EditLogInputStream fromByteString(ByteString bytes,
+      long startTxId, long endTxId, boolean inProgress) {
+    return new EditLogFileInputStream(new ByteStringLog(bytes,
+        String.format("ByteStringEditLog[%d, %d]", startTxId, endTxId)),
+        startTxId, endTxId, inProgress);
+  }
   
   
   private EditLogFileInputStream(LogSource log,
   private EditLogFileInputStream(LogSource log,
       long firstTxId, long lastTxId,
       long firstTxId, long lastTxId,
@@ -376,6 +394,32 @@ public class EditLogFileInputStream extends EditLogInputStream {
     public long length();
     public long length();
     public String getName();
     public String getName();
   }
   }
+
+  private static class ByteStringLog implements LogSource {
+    private final ByteString bytes;
+    private final String name;
+
+    public ByteStringLog(ByteString bytes, String name) {
+      this.bytes = bytes;
+      this.name = name;
+    }
+
+    @Override
+    public InputStream getInputStream() {
+      return bytes.newInput();
+    }
+
+    @Override
+    public long length() {
+      return bytes.size();
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+  }
   
   
   private static class FileLog implements LogSource {
   private static class FileLog implements LogSource {
     private final File file;
     private final File file;

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

@@ -151,7 +151,11 @@ public class EditLogTailer {
   private int maxRetries;
   private int maxRetries;
 
 
   /**
   /**
-   * Whether the tailer should tail the in-progress edit log segments.
+   * Whether the tailer should tail the in-progress edit log segments. If true,
+   * this will also attempt to optimize for latency when tailing the edit logs
+   * (if using the
+   * {@link org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager}, this
+   * implies using the RPC-based mechanism to tail edits).
    */
    */
   private final boolean inProgressOk;
   private final boolean inProgressOk;
 
 

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -3154,7 +3154,9 @@
   <description>
   <description>
     Whether enable standby namenode to tail in-progress edit logs.
     Whether enable standby namenode to tail in-progress edit logs.
     Clients might want to turn it on when they want Standby NN to have
     Clients might want to turn it on when they want Standby NN to have
-    more up-to-date data.
+    more up-to-date data. When using the QuorumJournalManager, this enables
+    tailing of edit logs via the RPC-based mechanism, rather than streaming,
+    which allows for much fresher data.
   </description>
   </description>
 </property>
 </property>
 
 

+ 130 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

@@ -44,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -91,6 +92,10 @@ public class TestQuorumJournalManager {
     conf = new Configuration();
     conf = new Configuration();
     // Don't retry connections - it just slows down the tests.
     // Don't retry connections - it just slows down the tests.
     conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
     conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    // Turn off IPC client caching to handle daemon restarts.
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     
     
     cluster = new MiniJournalCluster.Builder(conf)
     cluster = new MiniJournalCluster.Builder(conf)
         .baseDir(GenericTestUtils.getRandomizedTestDir().getAbsolutePath())
         .baseDir(GenericTestUtils.getRandomizedTestDir().getAbsolutePath())
@@ -959,6 +964,131 @@ public class TestQuorumJournalManager {
     qjm2.selectInputStreams(streams, 1, true, true);
     qjm2.selectInputStreams(streams, 1, true, true);
     verifyEdits(streams, 1, 8);
     verifyEdits(streams, 1, 8);
   }
   }
+
+  @Test
+  public void testSelectViaRpcWithDurableTransactions() throws Exception {
+    // Two loggers will have up to ID 5, one will have up to ID 6
+    failLoggerAtTxn(spies.get(0), 6);
+    failLoggerAtTxn(spies.get(1), 6);
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 5);
+    try {
+      writeTxns(stm, 6, 1);
+      fail("Did not fail to write when only a minority succeeded");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3", qe);
+    }
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    verifyEdits(streams, 1, 5);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcWithoutDurableTransactions() throws Exception {
+    setupLoggers345();
+    futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, false);
+    verifyEdits(streams, 1, 5);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcOneDeadJN() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+
+    cluster.getJournalNode(0).stopAndJoin(0);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, false);
+    verifyEdits(streams, 1, 10);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+  }
+
+  @Test
+  public void testSelectViaRpcTwoDeadJNs() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+
+    cluster.getJournalNode(0).stopAndJoin(0);
+    cluster.getJournalNode(1).stopAndJoin(0);
+
+    try {
+      qjm.selectInputStreams(new ArrayList<>(), 1, true, false);
+      fail("");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3", qe);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcTwoJNsError() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+    writeTxns(stm, 11, 1);
+
+    futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    futureThrows(new IOException()).when(spies.get(1)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    // This should still succeed as the QJM should fall back to the streaming
+    // mechanism for fetching edits
+    verifyEdits(streams, 1, 11);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
+    }
+  }
+
+  @Test
+  public void testSelectViaRpcAfterJNRestart() throws Exception {
+    EditLogOutputStream stm =
+        qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    writeTxns(stm, 1, 10);
+    qjm.finalizeLogSegment(1, 10);
+
+    // Close to avoid connections hanging around after the JNs are restarted
+    for (int i = 0; i < cluster.getNumNodes(); i++) {
+      cluster.restartJournalNode(i);
+    }
+    cluster.waitActive();
+
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    // This should still succeed as the QJM should fall back to the streaming
+    // mechanism for fetching edits
+    verifyEdits(streams, 1, 10);
+    IOUtils.closeStreams(streams.toArray(new Closeable[0]));
+
+    for (AsyncLogger logger : spies) {
+      Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
+    }
+  }
   
   
   private QuorumJournalManager createSpyingQJM()
   private QuorumJournalManager createSpyingQJM()
       throws IOException, URISyntaxException {
       throws IOException, URISyntaxException {

+ 98 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java

@@ -17,11 +17,14 @@
  */
  */
 package org.apache.hadoop.hdfs.qjournal.client;
 package org.apache.hadoop.hdfs.qjournal.client;
 
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.eq;
 
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
 import java.util.List;
 import java.util.List;
@@ -29,11 +32,11 @@ import java.util.List;
 import org.junit.Assert;
 import org.junit.Assert;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -45,11 +48,15 @@ import org.mockito.Mockito;
 import org.mockito.stubbing.Stubber;
 import org.mockito.stubbing.Stubber;
 
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.ByteString;
 
 
 import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
 import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
 
 
 /**
 /**
  * True unit tests for QuorumJournalManager
  * True unit tests for QuorumJournalManager
@@ -217,6 +224,94 @@ public class TestQuorumJournalManagerUnit {
     Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L);
     Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L);
   }
   }
 
 
+  @Test
+  public void testReadRpcInputStreams() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(getJournaledEditsReponse(1, 3))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 3);
+  }
+
+  @Test
+  public void testReadRpcMismatchedInputStreams() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 2);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsOneSlow() throws Exception {
+    for (int jn = 0; jn < 2; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+    Mockito.doReturn(SettableFuture.create())
+        .when(spyLoggers.get(2)).getJournaledEdits(1,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 1);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsOneException() throws Exception {
+    for (int jn = 0; jn < 2; jn++) {
+      futureReturns(getJournaledEditsReponse(1, jn + 1))
+          .when(spyLoggers.get(jn)).getJournaledEdits(1,
+          QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+    futureThrows(new IOException()).when(spyLoggers.get(2))
+        .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    verifyEdits(streams, 1, 1);
+  }
+
+  @Test
+  public void testReadRpcInputStreamsNoNewEdits() throws Exception {
+    for (int jn = 0; jn < 3; jn++) {
+      futureReturns(GetJournaledEditsResponseProto.newBuilder()
+          .setTxnCount(0).setEditLog(ByteString.EMPTY).build())
+          .when(spyLoggers.get(jn))
+          .getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+    }
+
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(0, streams.size());
+  }
+
+  private GetJournaledEditsResponseProto getJournaledEditsReponse(
+      int startTxn, int numTxns) throws Exception {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    EditLogFileOutputStream.writeHeader(
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION,
+        new DataOutputStream(byteStream));
+    byteStream.write(createTxnData(startTxn, numTxns));
+    return GetJournaledEditsResponseProto.newBuilder()
+        .setTxnCount(numTxns)
+        .setEditLog(ByteString.copyFrom(byteStream.toByteArray()))
+        .build();
+  }
+
   private EditLogOutputStream createLogSegment() throws IOException {
   private EditLogOutputStream createLogSegment() throws IOException {
     futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
     futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
         Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
         Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java

@@ -32,6 +32,7 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URL;
 import java.util.EnumMap;
 import java.util.EnumMap;
 
 
+import com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -80,6 +81,23 @@ public class TestEditLogFileInputStream {
     elis.close();
     elis.close();
   }
   }
 
 
+  @Test
+  public void testByteStringLog() throws Exception {
+    ByteString bytes = ByteString.copyFrom(FAKE_LOG_DATA);
+    EditLogInputStream elis = EditLogFileInputStream.fromByteString(bytes,
+        HdfsServerConstants.INVALID_TXID, HdfsServerConstants.INVALID_TXID,
+        true);
+    // Read the edit log and verify that all of the data is present
+    EnumMap<FSEditLogOpCodes, Holder<Integer>> counts = FSImageTestUtil
+        .countEditLogOpTypes(elis);
+    assertThat(counts.get(FSEditLogOpCodes.OP_ADD).held, is(1));
+    assertThat(counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP_V1).held, is(1));
+    assertThat(counts.get(FSEditLogOpCodes.OP_CLOSE).held, is(1));
+
+    assertEquals(FAKE_LOG_DATA.length, elis.length());
+    elis.close();
+  }
+
   /**
   /**
    * Regression test for HDFS-8965 which verifies that
    * Regression test for HDFS-8965 which verifies that
    * FSEditLogFileInputStream#scanOp verifies Op checksums.
    * FSEditLogFileInputStream#scanOp verifies Op checksums.