Jelajahi Sumber

HDFS-5496. Make replication queue initialization asynchronous. Contributed by Vinay.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1552109 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 tahun lalu
induk
melakukan
788fca4124

+ 7 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightGSet.java

@@ -244,13 +244,14 @@ public class LightWeightGSet<K, E extends K> implements GSet<K, E> {
     out.println("\n]");
   }
 
-  private class SetIterator implements Iterator<E> {
+  public class SetIterator implements Iterator<E> {
     /** The starting modification for fail-fast. */
     private int iterModification = modification;
     /** The current index of the entry array. */
     private int index = -1;
     private LinkedElement cur = null;
     private LinkedElement next = nextNonemptyEntry();
+    private boolean trackModification = true;
 
     /** Find the next nonempty entry starting at (index + 1). */
     private LinkedElement nextNonemptyEntry() {
@@ -259,7 +260,7 @@ public class LightWeightGSet<K, E extends K> implements GSet<K, E> {
     }
 
     private void ensureNext() {
-      if (modification != iterModification) {
+      if (trackModification && modification != iterModification) {
         throw new ConcurrentModificationException("modification=" + modification
             + " != iterModification = " + iterModification);
       }
@@ -304,6 +305,10 @@ public class LightWeightGSet<K, E extends K> implements GSet<K, E> {
       iterModification++;
       cur = null;
     }
+
+    public void setTrackModification(boolean trackModification) {
+      this.trackModification = trackModification;
+    }
   }
   
   /**

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -242,6 +242,9 @@ Trunk (Unreleased)
     HDFS-5629. Support HTTPS in JournalNode and SecondaryNameNode. 
     (Haohui Mai via jing9)
 
+    HDFS-5496. Make replication queue initialization asynchronous. (Vinay via
+    jing9)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -398,7 +398,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 1000;
   public static final String  DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
   public static final int     DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500;
-
+  /* Maximum number of blocks to process for initializing replication queues */
+  public static final String  DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit";
+  public static final int     DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
   public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";

+ 138 - 36
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -229,6 +229,22 @@ public class BlockManager {
    */
   private boolean shouldPostponeBlocksFromFuture = false;
 
+  /**
+   * Process replication queues asynchronously to allow namenode safemode exit
+   * and failover to be faster. HDFS-5496
+   */
+  private Daemon replicationQueuesInitializer = null;
+  /**
+   * Number of blocks to process asychronously for replication queues
+   * initialization once aquired the namesystem lock. Remaining blocks will be
+   * processed again after aquiring lock again.
+   */
+  private int numBlocksPerIteration;
+  /**
+   * Progress of the Replication queues initialisation.
+   */
+  private double replicationQueuesInitProgress = 0.0;
+
   /** for block replicas placement */
   private BlockPlacementPolicy blockplacement;
 
@@ -305,6 +321,9 @@ public class BlockManager {
     this.maxNumBlocksToLog =
         conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
             DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+    this.numBlocksPerIteration = conf.getInt(
+        DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
+        DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
     
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
@@ -2319,45 +2338,127 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
    */
   public void processMisReplicatedBlocks() {
     assert namesystem.hasWriteLock();
-
-    long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0, nrPostponed = 0,
-         nrUnderConstruction = 0;
+    stopReplicationInitializer();
     neededReplications.clear();
-    for (BlockInfo block : blocksMap.getBlocks()) {
-      MisReplicationResult res = processMisReplicatedBlock(block);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("block " + block + ": " + res);
-      }
-      switch (res) {
-      case UNDER_REPLICATED:
-        nrUnderReplicated++;
-        break;
-      case OVER_REPLICATED:
-        nrOverReplicated++;
-        break;
-      case INVALID:
-        nrInvalid++;
-        break;
-      case POSTPONE:
-        nrPostponed++;
-        postponeBlock(block);
-        break;
-      case UNDER_CONSTRUCTION:
-        nrUnderConstruction++;
-        break;
-      case OK:
-        break;
-      default:
-        throw new AssertionError("Invalid enum value: " + res);
+    replicationQueuesInitializer = new Daemon() {
+
+      @Override
+      public void run() {
+        try {
+          processMisReplicatesAsync();
+        } catch (InterruptedException ie) {
+          LOG.info("Interrupted while processing replication queues.");
+        } catch (Exception e) {
+          LOG.error("Error while processing replication queues async", e);
+        }
+      }
+    };
+    replicationQueuesInitializer.setName("Replication Queue Initializer");
+    replicationQueuesInitializer.start();
+  }
+
+  /*
+   * Stop the ongoing initialisation of replication queues
+   */
+  private void stopReplicationInitializer() {
+    if (replicationQueuesInitializer != null) {
+      replicationQueuesInitializer.interrupt();
+      try {
+        replicationQueuesInitializer.join();
+      } catch (final InterruptedException e) {
+        LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
+        return;
+      } finally {
+        replicationQueuesInitializer = null;
       }
     }
-    
-    LOG.info("Total number of blocks            = " + blocksMap.size());
-    LOG.info("Number of invalid blocks          = " + nrInvalid);
-    LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
-    LOG.info("Number of  over-replicated blocks = " + nrOverReplicated +
-        ((nrPostponed > 0) ? ( " (" + nrPostponed + " postponed)") : ""));
-    LOG.info("Number of blocks being written    = " + nrUnderConstruction);
+  }
+
+  /*
+   * Since the BlocksMapGset does not throw the ConcurrentModificationException
+   * and supports further iteration after modification to list, there is a
+   * chance of missing the newly added block while iterating. Since every
+   * addition to blocksMap will check for mis-replication, missing mis-replication
+   * check for new blocks will not be a problem.
+   */
+  private void processMisReplicatesAsync() throws InterruptedException {
+    long nrInvalid = 0, nrOverReplicated = 0;
+    long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0;
+    long startTimeMisReplicatedScan = Time.now();
+    Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
+    long totalBlocks = blocksMap.size();
+    replicationQueuesInitProgress = 0;
+    long totalProcessed = 0;
+    while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
+      int processed = 0;
+      namesystem.writeLockInterruptibly();
+      try {
+        while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
+          BlockInfo block = blocksItr.next();
+          MisReplicationResult res = processMisReplicatedBlock(block);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("block " + block + ": " + res);
+          }
+          switch (res) {
+          case UNDER_REPLICATED:
+            nrUnderReplicated++;
+            break;
+          case OVER_REPLICATED:
+            nrOverReplicated++;
+            break;
+          case INVALID:
+            nrInvalid++;
+            break;
+          case POSTPONE:
+            nrPostponed++;
+            postponeBlock(block);
+            break;
+          case UNDER_CONSTRUCTION:
+            nrUnderConstruction++;
+            break;
+          case OK:
+            break;
+          default:
+            throw new AssertionError("Invalid enum value: " + res);
+          }
+          processed++;
+        }
+        totalProcessed += processed;
+        // there is a possibility that if any of the blocks deleted/added during
+        // initialisation, then progress might be different.
+        replicationQueuesInitProgress = Math.min((double) totalProcessed
+            / totalBlocks, 1.0);
+
+        if (!blocksItr.hasNext()) {
+          LOG.info("Total number of blocks            = " + blocksMap.size());
+          LOG.info("Number of invalid blocks          = " + nrInvalid);
+          LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
+          LOG.info("Number of  over-replicated blocks = " + nrOverReplicated
+              + ((nrPostponed > 0) ? (" (" + nrPostponed + " postponed)") : ""));
+          LOG.info("Number of blocks being written    = " + nrUnderConstruction);
+          NameNode.stateChangeLog
+              .info("STATE* Replication Queue initialization "
+                  + "scan for invalid, over- and under-replicated blocks "
+                  + "completed in " + (Time.now() - startTimeMisReplicatedScan)
+                  + " msec");
+          break;
+        }
+      } finally {
+        namesystem.writeUnlock();
+      }
+    }
+    if (Thread.currentThread().isInterrupted()) {
+      LOG.info("Interrupted while processing replication queues.");
+    }
+  }
+
+  /**
+   * Get the progress of the Replication queues initialisation
+   * 
+   * @return Returns values between 0 and 1 for the progress.
+   */
+  public double getReplicationQueuesInitProgress() {
+    return replicationQueuesInitProgress;
   }
 
   /**
@@ -3308,6 +3409,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
   }
 
   public void shutdown() {
+    stopReplicationInitializer();
     blocksMap.close();
   }
 }

+ 15 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java

@@ -22,6 +22,7 @@ import java.util.Iterator;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
+import org.apache.hadoop.util.LightWeightGSet.SetIterator;
 
 /**
  * This class maintains the map from a block to its metadata.
@@ -62,7 +63,20 @@ class BlocksMap {
   BlocksMap(int capacity) {
     // Use 2% of total memory to size the GSet capacity
     this.capacity = capacity;
-    this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
+    this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity) {
+      @Override
+      public Iterator<BlockInfo> iterator() {
+        SetIterator iterator = new SetIterator();
+        /*
+         * Not tracking any modifications to set. As this set will be used
+         * always under FSNameSystem lock, modifications will not cause any
+         * ConcurrentModificationExceptions. But there is a chance of missing
+         * newly added elements during iteration.
+         */
+        iterator.setTrackModification(false);
+        return iterator;
+      }
+    };
   }
 
 

+ 28 - 43
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -482,7 +482,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private HAContext haContext;
 
   private final boolean haEnabled;
-  
+
+  /** flag indicating whether replication queues have been initialized */
+  boolean initializedReplQueues = false;
+
   /**
    * Whether the namenode is in the middle of starting the active service
    */
@@ -914,8 +917,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     try {
       nnResourceChecker = new NameNodeResourceChecker(conf);
       checkAvailableResources();
-      assert safeMode != null &&
-        !safeMode.isPopulatingReplQueues();
+      assert safeMode != null && !isPopulatingReplQueues();
       StartupProgress prog = NameNode.getStartupProgress();
       prog.beginPhase(Phase.SAFEMODE);
       prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
@@ -971,12 +973,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         blockManager.clearQueues();
         blockManager.processAllPendingDNMessages();
 
-        if (!isInSafeMode() ||
-            (isInSafeMode() && safeMode.isPopulatingReplQueues())) {
+        // Only need to re-process the queue, If not in SafeMode.
+        if (!isInSafeMode()) {
           LOG.info("Reprocessing replication and invalidation queues");
-          blockManager.processMisReplicatedBlocks();
+          initializeReplQueues();
         }
-        
+
         if (LOG.isDebugEnabled()) {
           LOG.debug("NameNode metadata after re-processing " +
               "replication and invalidation queues during failover:\n" +
@@ -1015,7 +1017,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       startingActiveService = false;
     }
   }
-  
+
+  /**
+   * Initialize replication queues.
+   */
+  private void initializeReplQueues() {
+    LOG.info("initializing replication queues");
+    blockManager.processMisReplicatedBlocks();
+    initializedReplQueues = true;
+  }
+
   /**
    * @return Whether the namenode is transitioning to active state and is in the
    *         middle of the {@link #startActiveServices()}
@@ -1061,6 +1072,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
       cacheManager.deactivate();
       blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
+      // Don't want to keep replication queues when not in Active.
+      blockManager.clearQueues();
+      initializedReplQueues = false;
     } finally {
       writeUnlock();
     }
@@ -4548,7 +4562,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     private int safeReplication;
     /** threshold for populating needed replication queues */
     private double replQueueThreshold;
-      
     // internal fields
     /** Time when threshold was reached.
      * <br> -1 safe mode is off
@@ -4566,8 +4579,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     private int blockReplQueueThreshold;
     /** time of the last status printout */
     private long lastStatusReport = 0;
-    /** flag indicating whether replication queues have been initialized */
-    boolean initializedReplQueues = false;
     /** Was safemode entered automatically because available resources were low. */
     private boolean resourcesLow = false;
     /** Should safemode adjust its block totals as blocks come in */
@@ -4627,7 +4638,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
      * 
      * @see SafeModeInfo
      */
-    private SafeModeInfo(boolean resourcesLow, boolean isReplQueuesInited) {
+    private SafeModeInfo(boolean resourcesLow) {
       this.threshold = 1.5f;  // this threshold can never be reached
       this.datanodeThreshold = Integer.MAX_VALUE;
       this.extension = Integer.MAX_VALUE;
@@ -4636,7 +4647,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       this.blockTotal = -1;
       this.blockSafe = -1;
       this.resourcesLow = resourcesLow;
-      this.initializedReplQueues = isReplQueuesInited;
       enter();
       reportStatus("STATE* Safe mode is ON.", true);
     }
@@ -4650,13 +4660,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       return this.reached >= 0;
     }
       
-    /**
-     * Check if we are populating replication queues.
-     */
-    private synchronized boolean isPopulatingReplQueues() {
-      return initializedReplQueues;
-    }
-
     /**
      * Enter safe mode.
      */
@@ -4703,21 +4706,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
     }
 
-    /**
-     * Initialize replication queues.
-     */
-    private synchronized void initializeReplQueues() {
-      LOG.info("initializing replication queues");
-      assert !isPopulatingReplQueues() : "Already initialized repl queues";
-      long startTimeMisReplicatedScan = now();
-      blockManager.processMisReplicatedBlocks();
-      initializedReplQueues = true;
-      NameNode.stateChangeLog.info("STATE* Replication Queue initialization "
-          + "scan for invalid, over- and under-replicated blocks "
-          + "completed in " + (now() - startTimeMisReplicatedScan)
-          + " msec");
-    }
-
     /**
      * Check whether we have reached the threshold for 
      * initializing replication queues.
@@ -4765,7 +4753,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (smmthread == null && needEnter()) {
         enter();
         // check if we are ready to initialize replication queues
-        if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
+        if (canInitializeReplQueues() && !isPopulatingReplQueues()
+            && !haEnabled) {
           initializeReplQueues();
         }
         reportStatus("STATE* Safe mode ON.", false);
@@ -4790,7 +4779,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       }
 
       // check if we are ready to initialize replication queues
-      if (canInitializeReplQueues() && !isPopulatingReplQueues()) {
+      if (canInitializeReplQueues() && !isPopulatingReplQueues() && !haEnabled) {
         initializeReplQueues();
       }
     }
@@ -5100,11 +5089,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if (!shouldPopulateReplQueues()) {
       return false;
     }
-    // safeMode is volatile, and may be set to null at any time
-    SafeModeInfo safeMode = this.safeMode;
-    if (safeMode == null)
-      return true;
-    return safeMode.isPopulatingReplQueues();
+    return initializedReplQueues;
   }
 
   private boolean shouldPopulateReplQueues() {
@@ -5224,7 +5209,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         getEditLog().logSyncAll();
       }
       if (!isInSafeMode()) {
-        safeMode = new SafeModeInfo(resourcesLow, isPopulatingReplQueues());
+        safeMode = new SafeModeInfo(resourcesLow);
         return;
       }
       if (resourcesLow) {

+ 2 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -228,15 +228,10 @@ public class NameNodeAdapter {
   }
   
   /**
-   * @return true if safemode is not running, or if safemode has already
-   * initialized the replication queues
+   * @return Replication queue initialization status
    */
   public static boolean safeModeInitializedReplQueues(NameNode nn) {
-    SafeModeInfo smi = nn.getNamesystem().getSafeModeInfoForTests();
-    if (smi == null) {
-      return true;
-    }
-    return smi.initializedReplQueues;
+    return nn.getNamesystem().isPopulatingReplQueues();
   }
   
   public static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {