Explorar o código

HDFS-12978. Fine-grained locking while consuming journal stream. Contributed by Konstantin Shvachko.

Konstantin V Shvachko %!s(int64=7) %!d(string=hai) anos
pai
achega
3d7e345823

+ 19 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -138,7 +138,7 @@ public class FSEditLogLoader {
   
   long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
       throws IOException {
-    return loadFSEdits(edits, expectedStartingTxId, null, null);
+    return loadFSEdits(edits, expectedStartingTxId, Long.MAX_VALUE, null, null);
   }
 
   /**
@@ -147,6 +147,7 @@ public class FSEditLogLoader {
    * along.
    */
   long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
+      long maxTxnsToRead,
       StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = createStartupProgressStep(edits);
@@ -154,9 +155,10 @@ public class FSEditLogLoader {
     fsNamesys.writeLock();
     try {
       long startTime = monotonicNow();
-      FSImage.LOG.info("Start loading edits file " + edits.getName());
+      FSImage.LOG.info("Start loading edits file " + edits.getName()
+          + " maxTxnsToRead = " + maxTxnsToRead);
       long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
-          startOpt, recovery);
+          maxTxnsToRead, startOpt, recovery);
       FSImage.LOG.info("Edits file " + edits.getName() 
           + " of size " + edits.length() + " edits # " + numEdits 
           + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
@@ -171,8 +173,13 @@ public class FSEditLogLoader {
   long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
       long expectedStartingTxId, StartupOption startOpt,
       MetaRecoveryContext recovery) throws IOException {
-    FSDirectory fsDir = fsNamesys.dir;
+    return loadEditRecords(in, closeOnExit, expectedStartingTxId,
+        Long.MAX_VALUE, startOpt, recovery);
+  }
 
+  long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
+      long expectedStartingTxId, long maxTxnsToRead, StartupOption startOpt,
+      MetaRecoveryContext recovery) throws IOException {
     EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
       new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
 
@@ -181,6 +188,7 @@ public class FSEditLogLoader {
     }
 
     fsNamesys.writeLock();
+    FSDirectory fsDir = fsNamesys.dir;
     fsDir.writeLock();
 
     long recentOpcodeOffsets[] = new long[4];
@@ -285,6 +293,9 @@ public class FSEditLogLoader {
           }
           numEdits++;
           totalEdits++;
+          if(numEdits >= maxTxnsToRead) {
+            break;
+          }
         } catch (RollingUpgradeOp.RollbackException e) {
           LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
           break;
@@ -308,7 +319,11 @@ public class FSEditLogLoader {
 
       if (FSImage.LOG.isDebugEnabled()) {
         dumpOpCounts(opCounts);
+        FSImage.LOG.debug("maxTxnsToRead = " + maxTxnsToRead
+            + " actual edits read = " + numEdits);
       }
+      assert numEdits <= maxTxnsToRead || numEdits == 1 :
+        "should read at least one txn, but not more than the configured max";
     }
     return numEdits;
   }

+ 10 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -742,7 +742,8 @@ public class FSImage implements Closeable {
     prog.endPhase(Phase.LOADING_FSIMAGE);
     
     if (!rollingRollback) {
-      long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
+      long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE,
+          startOpt, recovery);
       needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
           txnsAdvanced);
     } else {
@@ -866,11 +867,12 @@ public class FSImage implements Closeable {
    */
   public long loadEdits(Iterable<EditLogInputStream> editStreams,
       FSNamesystem target) throws IOException {
-    return loadEdits(editStreams, target, null, null);
+    return loadEdits(editStreams, target, Long.MAX_VALUE, null, null);
   }
 
-  private long loadEdits(Iterable<EditLogInputStream> editStreams,
-      FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
+  public long loadEdits(Iterable<EditLogInputStream> editStreams,
+      FSNamesystem target, long maxTxnsToRead,
+      StartupOption startOpt, MetaRecoveryContext recovery)
       throws IOException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editStreams));
     StartupProgress prog = NameNode.getStartupProgress();
@@ -885,14 +887,16 @@ public class FSImage implements Closeable {
         LOG.info("Reading " + editIn + " expecting start txid #" +
               (lastAppliedTxId + 1));
         try {
-          loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
+          loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
+              startOpt, recovery);
         } finally {
           // Update lastAppliedTxId even in case of error, since some ops may
           // have been successfully applied before the error.
           lastAppliedTxId = loader.getLastAppliedTxId();
         }
         // If we are in recovery mode, we may have skipped over some txids.
-        if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID) {
+        if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
+            && recovery != null) {
           lastAppliedTxId = editIn.getLastTxId();
         }
       }

+ 25 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

@@ -73,7 +73,19 @@ import com.google.common.base.Preconditions;
 @InterfaceStability.Evolving
 public class EditLogTailer {
   public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
-  
+
+  /**
+   * StandbyNode will hold namesystem lock to apply at most this many journal
+   * transactions.
+   * It will then release the lock and re-acquire it to load more transactions.
+   * By default the write lock is held for the entire journal segment.
+   * Fine-grained locking allows read requests to get through.
+   */
+  public static final String  DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY =
+      "dfs.ha.tail-edits.max-txns-per-lock";
+  public static final long DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT =
+      Long.MAX_VALUE;
+
   private final EditLogTailerThread tailerThread;
   
   private final Configuration conf;
@@ -138,6 +150,12 @@ public class EditLogTailer {
    */
   private final boolean inProgressOk;
 
+  /**
+   * Release the namesystem lock after loading this many transactions.
+   * Then re-acquire the lock to load more edits.
+   */
+  private final long maxTxnsPerLock;
+
   public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
     this.tailerThread = new EditLogTailerThread();
     this.conf = conf;
@@ -198,6 +216,10 @@ public class EditLogTailer {
         DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
         DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
 
+    this.maxTxnsPerLock = conf.getLong(
+        DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY,
+        DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT);
+
     nnCount = nns.size();
     // setup the iterator to endlessly loop the nns
     this.nnLookup = Iterators.cycle(nns);
@@ -290,7 +312,8 @@ public class EditLogTailer {
       // disk are ignored.
       long editsLoaded = 0;
       try {
-        editsLoaded = image.loadEdits(streams, namesystem);
+        editsLoaded = image.loadEdits(
+            streams, namesystem, maxTxnsPerLock, null, null);
       } catch (EditLogInputException elie) {
         editsLoaded = elie.getNumEditsLoaded();
         throw elie;

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java

@@ -98,8 +98,9 @@ public class TestEditLogTailer {
   public void testTailer() throws IOException, InterruptedException,
       ServiceFailedException {
     Configuration conf = getConf();
-    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 0);
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
+    conf.setLong(EditLogTailer.DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY, 3);
 
     HAUtil.setAllowStandbyReads(conf, true);
     
@@ -121,7 +122,10 @@ public class TestEditLogTailer {
       }
       
       HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
-      
+      assertEquals("Inconsistent number of applied txns on Standby",
+          nn1.getNamesystem().getEditLog().getLastWrittenTxId(),
+          nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1);
+
       for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
         assertTrue(NameNodeAdapter.getFileInfo(nn2,
             getDirPath(i), false).isDirectory());
@@ -134,7 +138,10 @@ public class TestEditLogTailer {
       }
       
       HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
-      
+      assertEquals("Inconsistent number of applied txns on Standby",
+          nn1.getNamesystem().getEditLog().getLastWrittenTxId(),
+          nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1);
+
       for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
         assertTrue(NameNodeAdapter.getFileInfo(nn2,
             getDirPath(i), false).isDirectory());