|
@@ -27,6 +27,8 @@ import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.concurrent.ForkJoinPool;
|
|
|
+import java.util.concurrent.RecursiveAction;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
@@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.util.Canceler;
|
|
|
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
|
|
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
|
|
import org.apache.hadoop.io.MD5Hash;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
@@ -96,6 +99,7 @@ public class FSImage implements Closeable {
|
|
|
final private Configuration conf;
|
|
|
|
|
|
protected NNStorageRetentionManager archivalManager;
|
|
|
+ private int quotaInitThreads;
|
|
|
|
|
|
/**
|
|
|
* Construct an FSImage
|
|
@@ -130,6 +134,10 @@ public class FSImage implements Closeable {
|
|
|
storage.setRestoreFailedStorage(true);
|
|
|
}
|
|
|
|
|
|
+ this.quotaInitThreads = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
|
|
|
+
|
|
|
this.editLog = new FSEditLog(conf, storage, editsDirs);
|
|
|
|
|
|
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
|
@@ -840,7 +848,7 @@ public class FSImage implements Closeable {
|
|
|
} finally {
|
|
|
FSEditLog.closeAllStreams(editStreams);
|
|
|
// update the counts
|
|
|
- updateCountForQuota(target.dir.rootDir);
|
|
|
+ updateCountForQuota(target.dir.rootDir, quotaInitThreads);
|
|
|
}
|
|
|
prog.endPhase(Phase.LOADING_EDITS);
|
|
|
return lastAppliedTxId - prevLastAppliedTxId;
|
|
@@ -854,47 +862,82 @@ public class FSImage implements Closeable {
|
|
|
* This is an update of existing state of the filesystem and does not
|
|
|
* throw QuotaExceededException.
|
|
|
*/
|
|
|
- static void updateCountForQuota(INodeDirectory root) {
|
|
|
- updateCountForQuotaRecursively(root, Quota.Counts.newInstance());
|
|
|
+ static void updateCountForQuota(INodeDirectory root, int threads) {
|
|
|
+ threads = (threads < 1) ? 1 : threads;
|
|
|
+ LOG.info("Initializing quota with " + threads + " thread(s)");
|
|
|
+ long start = Time.now();
|
|
|
+ Quota.Counts counts = Quota.Counts.newInstance();
|
|
|
+ ForkJoinPool p = new ForkJoinPool(threads);
|
|
|
+ RecursiveAction task = new InitQuotaTask(root, counts);
|
|
|
+ p.execute(task);
|
|
|
+ task.join();
|
|
|
+ LOG.info("Quota initialization completed in " + (Time.now() - start) +
|
|
|
+ " milliseconds\n" + counts);
|
|
|
}
|
|
|
-
|
|
|
- private static void updateCountForQuotaRecursively(INodeDirectory dir,
|
|
|
- Quota.Counts counts) {
|
|
|
- final long parentNamespace = counts.get(Quota.NAMESPACE);
|
|
|
- final long parentDiskspace = counts.get(Quota.DISKSPACE);
|
|
|
|
|
|
- dir.computeQuotaUsage4CurrentDirectory(counts);
|
|
|
-
|
|
|
- for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
|
|
|
- if (child.isDirectory()) {
|
|
|
- updateCountForQuotaRecursively(child.asDirectory(), counts);
|
|
|
- } else {
|
|
|
- // file or symlink: count here to reduce recursive calls.
|
|
|
- child.computeQuotaUsage(counts, false);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * parallel initialization using fork-join.
|
|
|
+ */
|
|
|
+ private static class InitQuotaTask extends RecursiveAction {
|
|
|
+ private final INodeDirectory dir;
|
|
|
+ private final Quota.Counts counts;
|
|
|
+
|
|
|
+ public InitQuotaTask(INodeDirectory dir, Quota.Counts counts) {
|
|
|
+ this.dir = dir;
|
|
|
+ this.counts = counts;
|
|
|
}
|
|
|
-
|
|
|
- if (dir.isQuotaSet()) {
|
|
|
- // check if quota is violated. It indicates a software bug.
|
|
|
- final Quota.Counts q = dir.getQuotaCounts();
|
|
|
-
|
|
|
- final long namespace = counts.get(Quota.NAMESPACE) - parentNamespace;
|
|
|
- final long nsQuota = q.get(Quota.NAMESPACE);
|
|
|
- if (Quota.isViolated(nsQuota, namespace)) {
|
|
|
- LOG.error("BUG: Namespace quota violation in image for "
|
|
|
- + dir.getFullPathName()
|
|
|
- + " quota = " + nsQuota + " < consumed = " + namespace);
|
|
|
+
|
|
|
+ public void compute() {
|
|
|
+ Quota.Counts myCounts = Quota.Counts.newInstance();
|
|
|
+ dir.computeQuotaUsage4CurrentDirectory(myCounts);
|
|
|
+
|
|
|
+ ReadOnlyList<INode> children =
|
|
|
+ dir.getChildrenList(Snapshot.CURRENT_STATE_ID);
|
|
|
+
|
|
|
+ if (children.size() > 0) {
|
|
|
+ List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
|
|
|
+ for (INode child : children) {
|
|
|
+ if (child.isDirectory()) {
|
|
|
+ subtasks.add(new InitQuotaTask(child.asDirectory(), myCounts));
|
|
|
+ } else {
|
|
|
+ // file or symlink. count using the local counts variable
|
|
|
+ child.computeQuotaUsage(myCounts, false, Snapshot.CURRENT_STATE_ID);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // invoke and wait for completion
|
|
|
+ invokeAll(subtasks);
|
|
|
}
|
|
|
|
|
|
- final long diskspace = counts.get(Quota.DISKSPACE) - parentDiskspace;
|
|
|
- final long dsQuota = q.get(Quota.DISKSPACE);
|
|
|
- if (Quota.isViolated(dsQuota, diskspace)) {
|
|
|
- LOG.error("BUG: Diskspace quota violation in image for "
|
|
|
- + dir.getFullPathName()
|
|
|
- + " quota = " + dsQuota + " < consumed = " + diskspace);
|
|
|
+ if (dir.isQuotaSet()) {
|
|
|
+ // check if quota is violated. It indicates a software bug.
|
|
|
+ final Quota.Counts q = dir.getQuotaCounts();
|
|
|
+
|
|
|
+ final long nsConsumed = myCounts.get(Quota.NAMESPACE);
|
|
|
+ final long nsQuota = q.get(Quota.NAMESPACE);
|
|
|
+ if (Quota.isViolated(nsQuota, nsConsumed)) {
|
|
|
+ LOG.warn("Namespace quota violation in image for "
|
|
|
+ + dir.getFullPathName()
|
|
|
+ + " quota = " + nsQuota + " < consumed = " + nsConsumed);
|
|
|
+ }
|
|
|
+
|
|
|
+ final long ssConsumed = myCounts.get(Quota.DISKSPACE);
|
|
|
+ final long ssQuota = q.get(Quota.DISKSPACE);
|
|
|
+ if (Quota.isViolated(ssQuota, ssConsumed)) {
|
|
|
+ LOG.warn("Storagespace quota violation in image for "
|
|
|
+ + dir.getFullPathName()
|
|
|
+ + " quota = " + ssQuota + " < consumed = " + ssConsumed);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Setting quota for " + dir + "\n" + myCounts);
|
|
|
+ }
|
|
|
+ dir.getDirectoryWithQuotaFeature()
|
|
|
+ .setSpaceConsumed(nsConsumed, ssConsumed);
|
|
|
}
|
|
|
|
|
|
- dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, diskspace);
|
|
|
+ synchronized(counts) {
|
|
|
+ counts.add(myCounts);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|