|
@@ -27,6 +27,8 @@ import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.concurrent.ForkJoinPool;
|
|
|
+import java.util.concurrent.RecursiveAction;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -70,6 +72,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.util.Canceler;
|
|
|
import org.apache.hadoop.hdfs.util.EnumCounters;
|
|
|
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
|
|
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
|
|
import org.apache.hadoop.io.MD5Hash;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
@@ -100,6 +103,7 @@ public class FSImage implements Closeable {
|
|
|
final private Configuration conf;
|
|
|
|
|
|
protected NNStorageRetentionManager archivalManager;
|
|
|
+ private int quotaInitThreads;
|
|
|
|
|
|
/**
|
|
|
* The collection of newly added storage directories. These are partially
|
|
@@ -153,6 +157,10 @@ public class FSImage implements Closeable {
|
|
|
storage.setRestoreFailedStorage(true);
|
|
|
}
|
|
|
|
|
|
+ this.quotaInitThreads = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
|
|
|
+
|
|
|
this.editLog = new FSEditLog(conf, storage, editsDirs);
|
|
|
|
|
|
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
|
@@ -902,7 +910,7 @@ public class FSImage implements Closeable {
|
|
|
FSEditLog.closeAllStreams(editStreams);
|
|
|
// update the counts
|
|
|
updateCountForQuota(target.getBlockManager().getStoragePolicySuite(),
|
|
|
- target.dir.rootDir);
|
|
|
+ target.dir.rootDir, quotaInitThreads);
|
|
|
}
|
|
|
prog.endPhase(Phase.LOADING_EDITS);
|
|
|
return lastAppliedTxId - prevLastAppliedTxId;
|
|
@@ -917,65 +925,104 @@ public class FSImage implements Closeable {
|
|
|
* throw QuotaExceededException.
|
|
|
*/
|
|
|
static void updateCountForQuota(BlockStoragePolicySuite bsps,
|
|
|
- INodeDirectory root) {
|
|
|
- updateCountForQuotaRecursively(bsps, root.getStoragePolicyID(), root,
|
|
|
- new QuotaCounts.Builder().build());
|
|
|
- }
|
|
|
-
|
|
|
- private static void updateCountForQuotaRecursively(BlockStoragePolicySuite bsps,
|
|
|
- byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
|
|
|
- final long parentNamespace = counts.getNameSpace();
|
|
|
- final long parentStoragespace = counts.getStorageSpace();
|
|
|
- final EnumCounters<StorageType> parentTypeSpaces = counts.getTypeSpaces();
|
|
|
-
|
|
|
- dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId, counts);
|
|
|
-
|
|
|
- for (INode child : dir.getChildrenList(Snapshot.CURRENT_STATE_ID)) {
|
|
|
- final byte childPolicyId = child.getStoragePolicyIDForQuota(blockStoragePolicyId);
|
|
|
- if (child.isDirectory()) {
|
|
|
- updateCountForQuotaRecursively(bsps, childPolicyId,
|
|
|
- child.asDirectory(), counts);
|
|
|
- } else {
|
|
|
- // file or symlink: count here to reduce recursive calls.
|
|
|
- child.computeQuotaUsage(bsps, childPolicyId, counts, false,
|
|
|
- Snapshot.CURRENT_STATE_ID);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (dir.isQuotaSet()) {
|
|
|
- // check if quota is violated. It indicates a software bug.
|
|
|
- final QuotaCounts q = dir.getQuotaCounts();
|
|
|
-
|
|
|
- final long namespace = counts.getNameSpace() - parentNamespace;
|
|
|
- final long nsQuota = q.getNameSpace();
|
|
|
- if (Quota.isViolated(nsQuota, namespace)) {
|
|
|
- LOG.warn("Namespace quota violation in image for "
|
|
|
- + dir.getFullPathName()
|
|
|
- + " quota = " + nsQuota + " < consumed = " + namespace);
|
|
|
- }
|
|
|
+ INodeDirectory root, int threads) {
|
|
|
+ threads = (threads < 1) ? 1 : threads;
|
|
|
+ LOG.info("Initializing quota with " + threads + " thread(s)");
|
|
|
+ long start = Time.now();
|
|
|
+ QuotaCounts counts = new QuotaCounts.Builder().build();
|
|
|
+ ForkJoinPool p = new ForkJoinPool(threads);
|
|
|
+ RecursiveAction task = new InitQuotaTask(bsps, root.getStoragePolicyID(),
|
|
|
+ root, counts);
|
|
|
+ p.execute(task);
|
|
|
+ task.join();
|
|
|
+ LOG.info("Quota initialization completed in " + (Time.now() - start) +
|
|
|
+ " milliseconds\n" + counts);
|
|
|
+ }
|
|
|
|
|
|
- final long ssConsumed = counts.getStorageSpace() - parentStoragespace;
|
|
|
- final long ssQuota = q.getStorageSpace();
|
|
|
- if (Quota.isViolated(ssQuota, ssConsumed)) {
|
|
|
- LOG.warn("Storagespace quota violation in image for "
|
|
|
- + dir.getFullPathName()
|
|
|
- + " quota = " + ssQuota + " < consumed = " + ssConsumed);
|
|
|
+ /**
|
|
|
+ * parallel initialization using fork-join.
|
|
|
+ */
|
|
|
+ private static class InitQuotaTask extends RecursiveAction {
|
|
|
+ private final INodeDirectory dir;
|
|
|
+ private final QuotaCounts counts;
|
|
|
+ private final BlockStoragePolicySuite bsps;
|
|
|
+ private final byte blockStoragePolicyId;
|
|
|
+
|
|
|
+ public InitQuotaTask(BlockStoragePolicySuite bsps,
|
|
|
+ byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
|
|
|
+ this.dir = dir;
|
|
|
+ this.counts = counts;
|
|
|
+ this.bsps = bsps;
|
|
|
+ this.blockStoragePolicyId = blockStoragePolicyId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void compute() {
|
|
|
+ QuotaCounts myCounts = new QuotaCounts.Builder().build();
|
|
|
+ dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId,
|
|
|
+ myCounts);
|
|
|
+
|
|
|
+ ReadOnlyList<INode> children =
|
|
|
+ dir.getChildrenList(Snapshot.CURRENT_STATE_ID);
|
|
|
+
|
|
|
+ if (children.size() > 0) {
|
|
|
+ List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
|
|
|
+ for (INode child : children) {
|
|
|
+ final byte childPolicyId =
|
|
|
+ child.getStoragePolicyIDForQuota(blockStoragePolicyId);
|
|
|
+ if (child.isDirectory()) {
|
|
|
+ subtasks.add(new InitQuotaTask(bsps, childPolicyId,
|
|
|
+ child.asDirectory(), myCounts));
|
|
|
+ } else {
|
|
|
+ // file or symlink. count using the local counts variable
|
|
|
+ child.computeQuotaUsage(bsps, childPolicyId, myCounts,
|
|
|
+ false, Snapshot.CURRENT_STATE_ID);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // invoke and wait for completion
|
|
|
+ invokeAll(subtasks);
|
|
|
}
|
|
|
|
|
|
- final EnumCounters<StorageType> typeSpaces = counts.getTypeSpaces();
|
|
|
- for (StorageType t : StorageType.getTypesSupportingQuota()) {
|
|
|
- final long typeSpace = typeSpaces.get(t) - parentTypeSpaces.get(t);
|
|
|
- final long typeQuota = q.getTypeSpaces().get(t);
|
|
|
- if (Quota.isViolated(typeQuota, typeSpace)) {
|
|
|
- LOG.warn("Storage type quota violation in image for "
|
|
|
+ if (dir.isQuotaSet()) {
|
|
|
+ // check if quota is violated. It indicates a software bug.
|
|
|
+ final QuotaCounts q = dir.getQuotaCounts();
|
|
|
+
|
|
|
+ final long nsConsumed = myCounts.getNameSpace();
|
|
|
+ final long nsQuota = q.getNameSpace();
|
|
|
+ if (Quota.isViolated(nsQuota, nsConsumed)) {
|
|
|
+ LOG.warn("Namespace quota violation in image for "
|
|
|
+ + dir.getFullPathName()
|
|
|
+ + " quota = " + nsQuota + " < consumed = " + nsConsumed);
|
|
|
+ }
|
|
|
+
|
|
|
+ final long ssConsumed = myCounts.getStorageSpace();
|
|
|
+ final long ssQuota = q.getStorageSpace();
|
|
|
+ if (Quota.isViolated(ssQuota, ssConsumed)) {
|
|
|
+ LOG.warn("Storagespace quota violation in image for "
|
|
|
+ dir.getFullPathName()
|
|
|
- + " type = " + t.toString() + " quota = "
|
|
|
- + typeQuota + " < consumed " + typeSpace);
|
|
|
+ + " quota = " + ssQuota + " < consumed = " + ssConsumed);
|
|
|
}
|
|
|
+
|
|
|
+ final EnumCounters<StorageType> tsConsumed = myCounts.getTypeSpaces();
|
|
|
+ for (StorageType t : StorageType.getTypesSupportingQuota()) {
|
|
|
+ final long typeSpace = tsConsumed.get(t);
|
|
|
+ final long typeQuota = q.getTypeSpaces().get(t);
|
|
|
+ if (Quota.isViolated(typeQuota, typeSpace)) {
|
|
|
+ LOG.warn("Storage type quota violation in image for "
|
|
|
+ + dir.getFullPathName()
|
|
|
+ + " type = " + t.toString() + " quota = "
|
|
|
+ + typeQuota + " < consumed " + typeSpace);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Setting quota for " + dir + "\n" + myCounts);
|
|
|
+ }
|
|
|
+ dir.getDirectoryWithQuotaFeature().setSpaceConsumed(nsConsumed,
|
|
|
+ ssConsumed, tsConsumed);
|
|
|
}
|
|
|
|
|
|
- dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, ssConsumed,
|
|
|
- typeSpaces);
|
|
|
+ synchronized(counts) {
|
|
|
+ counts.add(myCounts);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|