Procházet zdrojové kódy

HDFS-8865. Improve quota initialization performance. Contributed by Kihwal Lee.

Kihwal Lee před 9 roky
rodič
revize
b6ceee9bf4

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -853,6 +853,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8962. Clean up checkstyle warnings in o.a.h.hdfs.DfsClientConf.
     (Mingliang Liu via wheat9)
 
+    HDFS-8865. Improve quota initialization performance. (kihwal)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -214,6 +214,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   public static final String  DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
   public static final int     DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;
+  public static final String  DFS_NAMENODE_QUOTA_INIT_THREADS_KEY = "dfs.namenode.quota.init-threads";
+  public static final int     DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT = 4;
 
   public static final String  DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold";
   public static final float   DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java

@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -82,6 +83,8 @@ public class BackupImage extends FSImage {
   
   private FSNamesystem namesystem;
 
+  private int quotaInitThreads;
+
   /**
    * Construct a backup image.
    * @param conf Configuration
@@ -91,6 +94,9 @@ public class BackupImage extends FSImage {
     super(conf);
     storage.setDisablePreUpgradableLayoutCheck(true);
     bnState = BNState.DROP_UNTIL_NEXT_ROLL;
+    quotaInitThreads = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
   }
 
   synchronized FSNamesystem getNamesystem() {
@@ -218,7 +224,7 @@ public class BackupImage extends FSImage {
 
       FSImage.updateCountForQuota(
           getNamesystem().dir.getBlockStoragePolicySuite(),
-          getNamesystem().dir.rootDir); // inefficient!
+          getNamesystem().dir.rootDir, quotaInitThreads);
     } finally {
       backupInputStream.clear();
     }

+ 100 - 53
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -27,6 +27,8 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveAction;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.Time;
 
@@ -100,6 +103,7 @@ public class FSImage implements Closeable {
   final private Configuration conf;
 
   protected NNStorageRetentionManager archivalManager;
+  private int quotaInitThreads;
 
   /* Used to make sure there are no concurrent checkpoints for a given txid
    * The checkpoint here could be one of the following operations.
@@ -143,6 +147,10 @@ public class FSImage implements Closeable {
       storage.setRestoreFailedStorage(true);
     }
 
+    this.quotaInitThreads = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
+
     this.editLog = new FSEditLog(conf, storage, editsDirs);
     
     archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
@@ -847,7 +855,7 @@ public class FSImage implements Closeable {
       FSEditLog.closeAllStreams(editStreams);
       // update the counts
       updateCountForQuota(target.getBlockManager().getStoragePolicySuite(),
-          target.dir.rootDir);
+          target.dir.rootDir, quotaInitThreads);
     }
     prog.endPhase(Phase.LOADING_EDITS);
     return lastAppliedTxId - prevLastAppliedTxId;
@@ -862,65 +870,104 @@ public class FSImage implements Closeable {
    * throw QuotaExceededException.
    */
   static void updateCountForQuota(BlockStoragePolicySuite bsps,
-                                  INodeDirectory root) {
-    updateCountForQuotaRecursively(bsps, root.getStoragePolicyID(), root,
-        new QuotaCounts.Builder().build());
- }
-
-  private static void updateCountForQuotaRecursively(BlockStoragePolicySuite bsps,
-      byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
-    final long parentNamespace = counts.getNameSpace();
-    final long parentStoragespace = counts.getStorageSpace();
-    final EnumCounters<StorageType> parentTypeSpaces = counts.getTypeSpaces();
-
-    dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId, counts);
-    
-    for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
-      final byte childPolicyId = child.getStoragePolicyIDForQuota(blockStoragePolicyId);
-      if (child.isDirectory()) {
-        updateCountForQuotaRecursively(bsps, childPolicyId,
-            child.asDirectory(), counts);
-      } else {
-        // file or symlink: count here to reduce recursive calls.
-        counts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
-            Snapshot.CURRENT_STATE_ID));
-      }
-    }
-      
-    if (dir.isQuotaSet()) {
-      // check if quota is violated. It indicates a software bug.
-      final QuotaCounts q = dir.getQuotaCounts();
-
-      final long namespace = counts.getNameSpace() - parentNamespace;
-      final long nsQuota = q.getNameSpace();
-      if (Quota.isViolated(nsQuota, namespace)) {
-        LOG.warn("Namespace quota violation in image for "
-            + dir.getFullPathName()
-            + " quota = " + nsQuota + " < consumed = " + namespace);
-      }
+      INodeDirectory root, int threads) {
+    threads = (threads < 1) ? 1 : threads;
+    LOG.info("Initializing quota with " + threads + " thread(s)");
+    long start = Time.now();
+    QuotaCounts counts = new QuotaCounts.Builder().build();
+    ForkJoinPool p = new ForkJoinPool(threads);
+    RecursiveAction task = new InitQuotaTask(bsps, root.getStoragePolicyID(),
+        root, counts);
+    p.execute(task);
+    task.join();
+    LOG.info("Quota initialization completed in " + (Time.now() - start) +
+        " milliseconds\n" + counts);
+  }
 
-      final long ssConsumed = counts.getStorageSpace() - parentStoragespace;
-      final long ssQuota = q.getStorageSpace();
-      if (Quota.isViolated(ssQuota, ssConsumed)) {
-        LOG.warn("Storagespace quota violation in image for "
-            + dir.getFullPathName()
-            + " quota = " + ssQuota + " < consumed = " + ssConsumed);
+  /**
+   * parallel initialization using fork-join.
+   */
+  private static class InitQuotaTask extends RecursiveAction {
+    private final INodeDirectory dir;
+    private final QuotaCounts counts;
+    private final BlockStoragePolicySuite bsps;
+    private final byte blockStoragePolicyId;
+
+    public InitQuotaTask(BlockStoragePolicySuite bsps,
+        byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
+      this.dir = dir;
+      this.counts = counts;
+      this.bsps = bsps;
+      this.blockStoragePolicyId = blockStoragePolicyId;
+    }
+
+    public void compute() {
+      QuotaCounts myCounts =  new QuotaCounts.Builder().build();
+      dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId,
+          myCounts);
+
+      ReadOnlyList<INode> children =
+          dir.getChildrenList(Snapshot.CURRENT_STATE_ID);
+
+      if (children.size() > 0) {
+        List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
+        for (INode child : children) {
+          final byte childPolicyId =
+              child.getStoragePolicyIDForQuota(blockStoragePolicyId);
+          if (child.isDirectory()) {
+            subtasks.add(new InitQuotaTask(bsps, childPolicyId,
+                child.asDirectory(), myCounts));
+          } else {
+            // file or symlink. count using the local counts variable
+            myCounts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
+                Snapshot.CURRENT_STATE_ID));
+          }
+        }
+        // invoke and wait for completion
+        invokeAll(subtasks);
       }
 
-      final EnumCounters<StorageType> typeSpaces = counts.getTypeSpaces();
-      for (StorageType t : StorageType.getTypesSupportingQuota()) {
-        final long typeSpace = typeSpaces.get(t) - parentTypeSpaces.get(t);
-        final long typeQuota = q.getTypeSpaces().get(t);
-        if (Quota.isViolated(typeQuota, typeSpace)) {
-          LOG.warn("Storage type quota violation in image for "
+      if (dir.isQuotaSet()) {
+        // check if quota is violated. It indicates a software bug.
+        final QuotaCounts q = dir.getQuotaCounts();
+
+        final long nsConsumed = myCounts.getNameSpace();
+        final long nsQuota = q.getNameSpace();
+        if (Quota.isViolated(nsQuota, nsConsumed)) {
+          LOG.warn("Namespace quota violation in image for "
+              + dir.getFullPathName()
+              + " quota = " + nsQuota + " < consumed = " + nsConsumed);
+        }
+
+        final long ssConsumed = myCounts.getStorageSpace();
+        final long ssQuota = q.getStorageSpace();
+        if (Quota.isViolated(ssQuota, ssConsumed)) {
+          LOG.warn("Storagespace quota violation in image for "
               + dir.getFullPathName()
-              + " type = " + t.toString() + " quota = "
-              + typeQuota + " < consumed " + typeSpace);
+              + " quota = " + ssQuota + " < consumed = " + ssConsumed);
         }
+
+        final EnumCounters<StorageType> tsConsumed = myCounts.getTypeSpaces();
+        for (StorageType t : StorageType.getTypesSupportingQuota()) {
+          final long typeSpace = tsConsumed.get(t);
+          final long typeQuota = q.getTypeSpaces().get(t);
+          if (Quota.isViolated(typeQuota, typeSpace)) {
+            LOG.warn("Storage type quota violation in image for "
+                + dir.getFullPathName()
+                + " type = " + t.toString() + " quota = "
+                + typeQuota + " < consumed " + typeSpace);
+          }
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting quota for " + dir + "\n" + myCounts);
+        }
+        dir.getDirectoryWithQuotaFeature().setSpaceConsumed(nsConsumed,
+            ssConsumed, tsConsumed);
       }
 
-      dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, ssConsumed,
-          typeSpaces);
+      synchronized(counts) {
+        counts.add(myCounts);
+      }
     }
   }
 

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/QuotaCounts.java

@@ -159,6 +159,13 @@ public class QuotaCounts {
     return tsCounts.anyGreaterOrEqual(val);
   }
 
+  @Override
+  public String toString() {
+    return "name space=" + getNameSpace() +
+        "\nstorage space=" + getStorageSpace() +
+        "\nstorage types=" + getTypeSpaces();
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj == this) {
@@ -176,4 +183,5 @@ public class QuotaCounts {
     assert false : "hashCode not designed";
     return 42; // any arbitrary constant will do
   }
-}
+
+}

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2402,4 +2402,14 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.quota.init-threads</name>
+  <value>4</value>
+  <description>
+    The number of concurrent threads to be used in quota initialization. The
+    speed of quota initialization also affects the namenode fail-over latency.
+    If the size of name space is big, try increasing this.
+  </description>
+</property>
+
 </configuration>

+ 64 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDiskspaceQuotaUpdate.java

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.EnumSet;
+import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.ipc.RemoteException;
 import org.junit.After;
 import org.junit.Assert;
@@ -310,4 +312,66 @@ public class TestDiskspaceQuotaUpdate {
     dfs.recoverLease(file);
     cluster.restartNameNodes();
   }
+
+  /**
+   * Check whether the quota is initialized correctly.
+   */
+  @Test
+  public void testQuotaInitialization() throws Exception {
+    final int size = 500;
+    Path testDir = new Path("/testDir");
+    long expectedSize = 3 * BLOCKSIZE + BLOCKSIZE/2;
+    dfs.mkdirs(testDir);
+    dfs.setQuota(testDir, size*4, expectedSize*size*2);
+
+    Path[] testDirs = new Path[size];
+    for (int i = 0; i < size; i++) {
+      testDirs[i] = new Path(testDir, "sub" + i);
+      dfs.mkdirs(testDirs[i]);
+      dfs.setQuota(testDirs[i], 100, 1000000);
+      DFSTestUtil.createFile(dfs, new Path(testDirs[i], "a"), expectedSize,
+          (short)1, 1L);
+    }
+
+    // Directly access the name system to obtain the current cached usage.
+    INodeDirectory root = fsdir.getRoot();
+    HashMap<String, Long> nsMap = new HashMap<String, Long>();
+    HashMap<String, Long> dsMap = new HashMap<String, Long>();
+    scanDirsWithQuota(root, nsMap, dsMap, false);
+
+    FSImage.updateCountForQuota(
+        fsdir.getBlockManager().getStoragePolicySuite(), root, 1);
+    scanDirsWithQuota(root, nsMap, dsMap, true);
+
+    FSImage.updateCountForQuota(
+        fsdir.getBlockManager().getStoragePolicySuite(), root, 2);
+    scanDirsWithQuota(root, nsMap, dsMap, true);
+
+    FSImage.updateCountForQuota(
+        fsdir.getBlockManager().getStoragePolicySuite(), root, 4);
+    scanDirsWithQuota(root, nsMap, dsMap, true);
+  }
+
+  private void scanDirsWithQuota(INodeDirectory dir,
+      HashMap<String, Long> nsMap,
+      HashMap<String, Long> dsMap, boolean verify) {
+    if (dir.isQuotaSet()) {
+      // get the current consumption
+      QuotaCounts q = dir.getDirectoryWithQuotaFeature().getSpaceConsumed();
+      String name = dir.getFullPathName();
+      if (verify) {
+        assertEquals(nsMap.get(name).longValue(), q.getNameSpace());
+        assertEquals(dsMap.get(name).longValue(), q.getStorageSpace());
+      } else {
+        nsMap.put(name, Long.valueOf(q.getNameSpace()));
+        dsMap.put(name, Long.valueOf(q.getStorageSpace()));
+      }
+    }
+
+    for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
+      if (child instanceof INodeDirectory) {
+        scanDirsWithQuota((INodeDirectory)child, nsMap, dsMap, verify);
+      }
+    }
+  }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java

@@ -159,7 +159,7 @@ public class TestFSImageWithSnapshot {
     try {
       loader.load(imageFile, false);
       FSImage.updateCountForQuota(fsn.getBlockManager().getStoragePolicySuite(),
-          INodeDirectory.valueOf(fsn.getFSDirectory().getINode("/"), "/"));
+          INodeDirectory.valueOf(fsn.getFSDirectory().getINode("/"), "/"), 4);
     } finally {
       fsn.getFSDirectory().writeUnlock();
       fsn.writeUnlock();
@@ -509,4 +509,4 @@ public class TestFSImageWithSnapshot {
     fsn = cluster.getNamesystem();
     hdfs = cluster.getFileSystem();
   }
-}
+}