ソースを参照

HDFS-16771. Follow-up for HDFS-16659: JN should tersely print logs about NewerTxnIdException (#4882)

Signed-off-by: Erik Krogen <xkrogen@apache.org>
Co-authored-by: zengqiang.xu <zengqiang.xu@shopee.com>
ZanderXu 2 年 前
コミット
43c1ebae16

+ 0 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

@@ -31,7 +31,6 @@ import java.util.PriorityQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
 
 
-import org.apache.hadoop.hdfs.qjournal.server.NewerTxnIdException;
 import org.apache.hadoop.util.Lists;
 import org.apache.hadoop.util.Lists;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -524,9 +523,6 @@ public class QuorumJournalManager implements JournalManager {
         selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
         selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
         streams.addAll(rpcStreams);
         streams.addAll(rpcStreams);
         return;
         return;
-      } catch (NewerTxnIdException ntie) {
-        // normal situation, we requested newer IDs than any journal has. no new streams
-        return;
       } catch (IOException ioe) {
       } catch (IOException ioe) {
         LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
         LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
             " via RPC; falling back to streaming.", ioe);
             " via RPC; falling back to streaming.", ioe);

+ 12 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java

@@ -751,8 +751,18 @@ public class Journal implements Closeable {
           "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
           "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
     }
     }
     long highestTxId = getHighestWrittenTxId();
     long highestTxId = getHighestWrittenTxId();
-    if (sinceTxId > highestTxId) {
-      // Requested edits that don't exist yet and is newer than highestTxId.
+    if (sinceTxId == highestTxId + 1) {
+      // Requested edits that don't exist yet, but this is expected,
+      // because namenode always get the journaled edits with the sinceTxId
+      // equal to image.getLastAppliedTxId() + 1. Short-circuiting the cache here
+      // and returning a response with a count of 0.
+      metrics.rpcEmptyResponses.incr();
+      return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
+    } else if (sinceTxId > highestTxId + 1) {
+      // Requested edits that don't exist yet and this is unexpected. Means that there is a lag
+      // in this journal that does not contain some edits that should exist.
+      // Throw one NewerTxnIdException to make namenode treat this response as an exception.
+      // More detailed info please refer to: HDFS-16659 and HDFS-16771.
       metrics.rpcEmptyResponses.incr();
       metrics.rpcEmptyResponses.incr();
       throw new NewerTxnIdException(
       throw new NewerTxnIdException(
           "Highest txn ID available in the journal is %d, but requested txns starting at %d.",
           "Highest txn ID available in the journal is %d, but requested txns starting at %d.",

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

@@ -114,6 +114,8 @@ public class JournalNodeRpcServer implements QJournalProtocol,
         .setVerbose(false)
         .setVerbose(false)
         .build();
         .build();
 
 
+    this.server.addTerseExceptions(NewerTxnIdException.class);
+    this.server.addTerseExceptions(JournaledEditsCache.CacheMissException.class);
 
 
     //Adding InterQJournalProtocolPB to server
     //Adding InterQJournalProtocolPB to server
     InterQJournalProtocolServerSideTranslatorPB
     InterQJournalProtocolServerSideTranslatorPB

+ 37 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

@@ -42,8 +42,10 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.net.MockDomainNameResolver;
 import org.apache.hadoop.net.MockDomainNameResolver;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture;
@@ -1263,4 +1265,39 @@ public class TestQuorumJournalManager {
           segmentTxId);
           segmentTxId);
     }
     }
   }
   }
+
+  @Test
+  public void testSelectLatestEditsWithoutStreaming() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(
+        1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    // Successfully write these edits to JN0 ~ JN2
+    writeTxns(stm, 1, 10);
+
+    AtomicInteger atomicInteger = new AtomicInteger(0);
+    spyGetEditLogManifest(0, 11, true, atomicInteger::incrementAndGet);
+    spyGetEditLogManifest(1, 11, true, atomicInteger::incrementAndGet);
+    spyGetEditLogManifest(2, 11, true, atomicInteger::incrementAndGet);
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 1, true, true);
+    assertEquals(1, streams.size());
+    assertEquals(1, streams.get(0).getFirstTxId());
+    assertEquals(10, streams.get(0).getLastTxId());
+
+    streams.clear();
+    qjm.selectInputStreams(streams, 11, true, true);
+    assertEquals(0, streams.size());
+    assertEquals(0, atomicInteger.get());
+  }
+
+  private void spyGetEditLogManifest(int jnSpyIdx, long fromTxId,
+      boolean inProgressOk, Runnable preHook) {
+    Mockito.doAnswer((Answer<ListenableFuture<RemoteEditLogManifest>>) invocation -> {
+      preHook.run();
+      @SuppressWarnings("unchecked")
+      ListenableFuture<RemoteEditLogManifest> result =
+          (ListenableFuture<RemoteEditLogManifest>) invocation.callRealMethod();
+      return result;
+    }).when(spies.get(jnSpyIdx)).getEditLogManifest(fromTxId, inProgressOk);
+  }
 }
 }