Pārlūkot izejas kodu

HDFS-13608. [SBN read] Edit Tail Fast Path Part 2: Add ability for JournalNode to serve edits via RPC. Contributed by Erik Krogen.

Erik Krogen 7 gadi atpakaļ
vecāks
revīzija
88d65af8a1

+ 5 - 0
hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

@@ -289,6 +289,11 @@ The server-side metrics for a journal from the JournalNode's perspective. Each m
 | `LastWrittenTxId` | The highest transaction id stored on this JournalNode |
 | `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made |
 | `LastJournalTimestamp` | The timestamp of last successfully written transaction |
+| `TxnsServedViaRpc` | Number of transactions served via the RPC mechanism |
+| `BytesServedViaRpc` | Number of bytes served via the RPC mechanism |
+| `RpcRequestCacheMissAmountNumMisses` | Number of RPC requests which could not be served due to lack of data in the cache |
+| `RpcRequestCacheMissAmountAvgTxns` | The average number of transactions by which a request missed the cache; for example if transaction ID 10 is requested and the cache's oldest transaction is ID 15, value 5 will be added to this average |
+| `RpcEmptyResponses` | Number of RPC requests with zero edits returned |
 
 datanode
 --------

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -183,6 +183,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60;
   public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries";
   public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3;
+  public static final String  DFS_HA_TAILEDITS_INPROGRESS_KEY =
+      "dfs.ha.tail-edits.in-progress";
+  public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false;
   public static final String  DFS_NAMENODE_CHECKPOINT_PERIOD_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
   public static final long    DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT = 3600;

+ 23 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -133,7 +134,28 @@ public interface QJournalProtocol {
   public GetEditLogManifestResponseProto getEditLogManifest(String jid,
       long sinceTxId, boolean inProgressOk)
       throws IOException;
-  
+
+  /**
+   * Fetch edit logs present in the Journal's in-memory cache of edits
+   * ({@link org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache}).
+   * To enable this cache, in-progress edit log tailing must be enabled via the
+   * {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} configuration key.
+   *
+   * @param jid The ID of the journal from which to fetch edits.
+   * @param nameServiceId The ID of the namespace for which to fetch edits.
+   * @param sinceTxId Fetch edits starting at this transaction ID
+   * @param maxTxns Request at most this many transactions to be returned
+   * @throws IOException If there was an issue encountered while fetching edits
+   *     from the cache, including a cache miss (cache does not contain the
+   *     requested edits). The caller should then attempt to fetch the edits via
+   *     the streaming mechanism (starting with
+   *     {@link #getEditLogManifest(String, String, long, boolean)}).
+   * @return Response containing serialized edits to be loaded
+   * @see org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache
+   */
+  GetJournaledEditsResponseProto getJournaledEdits(String jid,
+      String nameServiceId, long sinceTxId, int maxTxns) throws IOException;
+
   /**
    * Begin the recovery process for a given segment. See the HDFS-3077
    * design document for details.

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java

@@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatReq
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -230,6 +232,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
     }
   }
 
+  @Override
+  public GetJournaledEditsResponseProto getJournaledEdits(
+      RpcController controller, GetJournaledEditsRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.getJournaledEdits(request.getJid().getIdentifier(),
+          request.hasNameServiceId() ? request.getNameServiceId() : null,
+          request.getSinceTxId(), request.getMaxTxns());
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
 
   @Override
   public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java

@@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeL
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -254,6 +256,24 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
     }
   }
 
+  @Override
+  public GetJournaledEditsResponseProto getJournaledEdits(String jid,
+      String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
+    try {
+      GetJournaledEditsRequestProto.Builder req =
+          GetJournaledEditsRequestProto.newBuilder()
+              .setJid(convertJournalId(jid))
+              .setSinceTxId(sinceTxId)
+              .setMaxTxns(maxTxns);
+      if (nameServiceId != null) {
+        req.setNameServiceId(nameServiceId);
+      }
+      return rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build());
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
   @Override
   public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
       long segmentTxId) throws IOException {

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
+import com.google.protobuf.ByteString;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
@@ -24,7 +25,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -34,10 +37,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -83,6 +88,7 @@ public class Journal implements Closeable {
   // Current writing state
   private EditLogOutputStream curSegment;
   private long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+  private int curSegmentLayoutVersion = 0;
   private long nextTxId = HdfsServerConstants.INVALID_TXID;
   private long highestWrittenTxId = 0;
   
@@ -131,6 +137,8 @@ public class Journal implements Closeable {
   
   private final FileJournalManager fjm;
 
+  private final JournaledEditsCache cache;
+
   private final JournalMetrics metrics;
 
   private long lastJournalTimestamp = 0;
@@ -149,6 +157,13 @@ public class Journal implements Closeable {
     refreshCachedData();
     
     this.fjm = storage.getJournalManager();
+
+    if (conf.getBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT)) {
+      this.cache = new JournaledEditsCache(conf);
+    } else {
+      this.cache = null;
+    }
     
     this.metrics = JournalMetrics.create(this);
     
@@ -347,6 +362,7 @@ public class Journal implements Closeable {
     curSegment.abort();
     curSegment = null;
     curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+    curSegmentLayoutVersion = 0;
   }
 
   /**
@@ -385,6 +401,9 @@ public class Journal implements Closeable {
       LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
           " ; journal id: " + journalId);
     }
+    if (cache != null) {
+      cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion);
+    }
 
     // If the edit has already been marked as committed, we know
     // it has been fsynced on a quorum of other nodes, and we are
@@ -572,6 +591,7 @@ public class Journal implements Closeable {
     
     curSegment = fjm.startLogSegment(txid, layoutVersion);
     curSegmentTxId = txid;
+    curSegmentLayoutVersion = layoutVersion;
     nextTxId = txid;
   }
   
@@ -591,6 +611,7 @@ public class Journal implements Closeable {
         curSegment.close();
         curSegment = null;
         curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+        curSegmentLayoutVersion = 0;
       }
       
       checkSync(nextTxId == endTxId + 1,
@@ -691,6 +712,44 @@ public class Journal implements Closeable {
     return new RemoteEditLogManifest(logs);
   }
 
+  /**
+   * @see QJournalProtocol#getJournaledEdits(String, String, long, int)
+   */
+  public GetJournaledEditsResponseProto getJournaledEdits(long sinceTxId,
+      int maxTxns) throws IOException {
+    if (cache == null) {
+      throw new IOException("The journal edits cache is not enabled, which " +
+          "is a requirement to fetch journaled edits via RPC. Please enable " +
+          "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
+    }
+    if (sinceTxId > getHighestWrittenTxId()) {
+      // Requested edits that don't exist yet; short-circuit the cache here
+      metrics.rpcEmptyResponses.incr();
+      return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
+    }
+    try {
+      List<ByteBuffer> buffers = new ArrayList<>();
+      int txnCount = cache.retrieveEdits(sinceTxId, maxTxns, buffers);
+      int totalSize = 0;
+      for (ByteBuffer buf : buffers) {
+        totalSize += buf.remaining();
+      }
+      metrics.txnsServedViaRpc.incr(txnCount);
+      metrics.bytesServedViaRpc.incr(totalSize);
+      ByteString.Output output = ByteString.newOutput(totalSize);
+      for (ByteBuffer buf : buffers) {
+        output.write(buf.array(), buf.position(), buf.remaining());
+      }
+      return GetJournaledEditsResponseProto.newBuilder()
+          .setTxnCount(txnCount)
+          .setEditLog(output.toByteString())
+          .build();
+    } catch (JournaledEditsCache.CacheMissException cme) {
+      metrics.rpcRequestCacheMissAmount.add(cme.getCacheMissAmount());
+      throw cme;
+    }
+  }
+
   /**
    * @return the current state of the given segment, or null if the
    * segment does not exist.

+ 19 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+
 
 /**
  * The server-side metrics for a journal from the JournalNode's
@@ -42,7 +44,23 @@ class JournalMetrics {
   
   @Metric("Number of bytes written since startup")
   MutableCounterLong bytesWritten;
-  
+
+  @Metric("Number of txns served via RPC")
+  MutableCounterLong txnsServedViaRpc;
+
+  @Metric("Number of bytes served via RPC")
+  MutableCounterLong bytesServedViaRpc;
+
+  @Metric
+  MutableStat rpcRequestCacheMissAmount = new MutableStat(
+      "RpcRequestCacheMissAmount", "Number of RPC requests unable to be " +
+      "served due to lack of availability in cache, and how many " +
+      "transactions away the request was from being in the cache.",
+      "Misses", "Txns");
+
+  @Metric("Number of RPC requests with zero edits returned")
+  MutableCounterLong rpcEmptyResponses;
+
   @Metric("Number of batches written where this node was lagging")
   MutableCounterLong batchesWrittenWhileLagging;
   

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -208,6 +209,12 @@ public class JournalNodeRpcServer implements QJournalProtocol {
         .build();
   }
 
+  @Override
+  public GetJournaledEditsResponseProto getJournaledEdits(String jid,
+      String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
+    return jn.getOrCreateJournal(jid).getJournaledEdits(sinceTxId, maxTxns);
+  }
+
   @Override
   public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
       long segmentTxId) throws IOException {

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto

@@ -272,6 +272,21 @@ message GetEditLogManifestResponseProto {
   // required NamespaceInfoProto nsInfo = 2;
 }
 
+/**
+ * getJournaledEdits()
+ */
+message GetJournaledEditsRequestProto {
+  required JournalIdProto jid = 1;
+  required uint64 sinceTxId = 2;
+  required uint32 maxTxns = 3;
+  optional string nameServiceId = 4;
+}
+
+message GetJournaledEditsResponseProto {
+  required uint32 txnCount = 1;
+  optional bytes editLog = 2;
+}
+
 /**
  * prepareRecovery()
  */
@@ -351,6 +366,9 @@ service QJournalProtocolService {
   rpc getEditLogManifest(GetEditLogManifestRequestProto)
       returns (GetEditLogManifestResponseProto);
 
+  rpc getJournaledEdits(GetJournaledEditsRequestProto)
+  returns (GetJournaledEditsResponseProto);
+
   rpc prepareRecovery(PrepareRecoveryRequestProto)
       returns (PrepareRecoveryResponseProto);
 

+ 23 - 14
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java

@@ -52,6 +52,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 
 /**
  * Class is used to test server sending state alignment information to clients
@@ -254,12 +257,15 @@ public class TestStateAlignmentContextWithHA {
       // Collect RpcRequestHeaders for verification later.
       final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> headers =
           new ArrayList<>();
-      Mockito.doAnswer(a -> {
-        Object[] arguments = a.getArguments();
-        RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
-            (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
-        headers.add(header);
-        return a.callRealMethod();
+      Mockito.doAnswer(new Answer() {
+        @Override
+        public Object answer(InvocationOnMock a) throws Throwable {
+          Object[] arguments = a.getArguments();
+          RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+              (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+          headers.add(header);
+          return a.callRealMethod();
+        }
       }).when(spiedAlignContext).updateRequestState(Mockito.any());
 
       DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
@@ -294,14 +300,17 @@ public class TestStateAlignmentContextWithHA {
              (DistributedFileSystem) FileSystem.get(CONF)) {
 
       // Make every client call have a stateId > server's stateId.
-      Mockito.doAnswer(a -> {
-        Object[] arguments = a.getArguments();
-        RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
-            (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
-        try {
-          return a.callRealMethod();
-        } finally {
-          header.setStateId(Long.MAX_VALUE);
+      Mockito.doAnswer(new Answer() {
+        @Override
+        public Object answer(InvocationOnMock a) throws Throwable {
+          Object[] arguments = a.getArguments();
+          RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+              (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+          try {
+            return a.callRealMethod();
+          } finally {
+            header.setStateId(Long.MAX_VALUE);
+          }
         }
       }).when(spiedAlignContext).updateRequestState(Mockito.any());
 

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java

@@ -17,19 +17,25 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.primitives.Bytes;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -38,6 +44,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
@@ -71,6 +78,8 @@ public class TestJournal {
   public void setup() throws Exception {
     FileUtil.fullyDelete(TEST_LOG_DIR);
     conf = new Configuration();
+    // Enable fetching edits via RPC
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
       mockErrorReporter);
     journal.format(FAKE_NSINFO);
@@ -434,4 +443,41 @@ public class TestJournal {
     }
   }
 
+  @Test
+  public void testReadFromCache() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    journal.journal(makeRI(2), 1, 1, 5, QJMTestUtil.createTxnData(1, 5));
+    journal.journal(makeRI(3), 1, 6, 5, QJMTestUtil.createTxnData(6, 5));
+    journal.journal(makeRI(4), 1, 11, 5, QJMTestUtil.createTxnData(11, 5));
+    assertJournaledEditsTxnCountAndContents(1, 7, 7,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    assertJournaledEditsTxnCountAndContents(1, 30, 15,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+
+    journal.finalizeLogSegment(makeRI(5), 1, 15);
+    int newLayoutVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
+    journal.startLogSegment(makeRI(6), 16, newLayoutVersion);
+    journal.journal(makeRI(7), 16, 16, 5, QJMTestUtil.createTxnData(16, 5));
+
+    assertJournaledEditsTxnCountAndContents(16, 10, 20, newLayoutVersion);
+  }
+
+  private void assertJournaledEditsTxnCountAndContents(int startTxn,
+      int requestedMaxTxns, int expectedEndTxn, int layoutVersion)
+      throws Exception {
+    GetJournaledEditsResponseProto result =
+        journal.getJournaledEdits(startTxn, requestedMaxTxns);
+    int expectedTxnCount = expectedEndTxn - startTxn + 1;
+    ByteArrayOutputStream headerBytes = new ByteArrayOutputStream();
+    EditLogFileOutputStream.writeHeader(layoutVersion,
+        new DataOutputStream(headerBytes));
+    assertEquals(expectedTxnCount, result.getTxnCount());
+    assertArrayEquals(
+        Bytes.concat(
+            headerBytes.toByteArray(),
+            QJMTestUtil.createTxnData(startTxn, expectedTxnCount)),
+        result.getEditLog().toByteArray());
+  }
 }

+ 11 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java

@@ -137,7 +137,7 @@ public class TestJournaledEditsCache {
     storeEdits(thirdCapacity * 4 + 1, thirdCapacity * 5);
 
     try {
-      cache.retrieveEdits(1, 10, new ArrayList<>());
+      cache.retrieveEdits(1, 10, new ArrayList<ByteBuffer>());
       fail();
     } catch (IOException ioe) {
       // expected
@@ -153,7 +153,7 @@ public class TestJournaledEditsCache {
     logs.stopCapturing();
     assertTrue(logs.getOutput().contains("batch of edits was too large"));
     try {
-      cache.retrieveEdits(1, 1, new ArrayList<>());
+      cache.retrieveEdits(1, 1, new ArrayList<ByteBuffer>());
       fail();
     } catch (IOException ioe) {
       // expected
@@ -188,7 +188,7 @@ public class TestJournaledEditsCache {
     // Ensure the cache will only return edits from a single
     // layout version at a time
     try {
-      cache.retrieveEdits(1, 50, new ArrayList<>());
+      cache.retrieveEdits(1, 50, new ArrayList<ByteBuffer>());
       fail("Expected a cache miss");
     } catch (JournaledEditsCache.CacheMissException cme) {
       // expected
@@ -202,7 +202,7 @@ public class TestJournaledEditsCache {
     storeEdits(10, 15);
 
     try {
-      cache.retrieveEdits(1, 20, new ArrayList<>());
+      cache.retrieveEdits(1, 20, new ArrayList<ByteBuffer>());
       fail();
     } catch (JournaledEditsCache.CacheMissException cme) {
       assertEquals(9, cme.getCacheMissAmount());
@@ -212,13 +212,13 @@ public class TestJournaledEditsCache {
 
   @Test(expected = JournaledEditsCache.CacheMissException.class)
   public void testReadUninitializedCache() throws Exception {
-    cache.retrieveEdits(1, 10, new ArrayList<>());
+    cache.retrieveEdits(1, 10, new ArrayList<ByteBuffer>());
   }
 
   @Test(expected = JournaledEditsCache.CacheMissException.class)
   public void testCacheMalformedInput() throws Exception {
     storeEdits(1, 1);
-    cache.retrieveEdits(-1, 10, new ArrayList<>());
+    cache.retrieveEdits(-1, 10, new ArrayList<ByteBuffer>());
   }
 
   private void storeEdits(int startTxn, int endTxn) throws Exception {
@@ -236,8 +236,11 @@ public class TestJournaledEditsCache {
     byte[] expectedBytes = Bytes.concat(
         getHeaderForLayoutVersion(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION),
         createTxnData(startTxn, expectedTxnCount));
-    byte[] actualBytes =
-        new byte[buffers.stream().mapToInt(ByteBuffer::remaining).sum()];
+    int length = 0;
+    for (ByteBuffer buffer : buffers) {
+      length += buffer.remaining();
+    }
+    byte[] actualBytes = new byte[length];
     int pos = 0;
     for (ByteBuffer buf : buffers) {
       System.arraycopy(buf.array(), buf.position(), actualBytes, pos,