|
@@ -27,8 +27,6 @@ import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.concurrent.ForkJoinPool;
|
|
|
-import java.util.concurrent.RecursiveAction;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -42,12 +40,10 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.StorageType;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.HAUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
|
|
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
|
@@ -61,7 +57,6 @@ import org.apache.hadoop.hdfs.server.common.Util;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
|
@@ -70,9 +65,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.util.Canceler;
|
|
|
-import org.apache.hadoop.hdfs.util.EnumCounters;
|
|
|
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
|
|
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
|
|
import org.apache.hadoop.io.MD5Hash;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
@@ -147,12 +140,7 @@ public class FSImage implements Closeable {
|
|
|
storage.setRestoreFailedStorage(true);
|
|
|
}
|
|
|
|
|
|
- this.quotaInitThreads = conf.getInt(
|
|
|
- DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT);
|
|
|
-
|
|
|
this.editLog = new FSEditLog(conf, storage, editsDirs);
|
|
|
-
|
|
|
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
|
|
|
}
|
|
|
|
|
@@ -857,125 +845,11 @@ public class FSImage implements Closeable {
|
|
|
}
|
|
|
} finally {
|
|
|
FSEditLog.closeAllStreams(editStreams);
|
|
|
- // update the counts
|
|
|
- updateCountForQuota(target.getBlockManager().getStoragePolicySuite(),
|
|
|
- target.dir.rootDir, quotaInitThreads);
|
|
|
}
|
|
|
prog.endPhase(Phase.LOADING_EDITS);
|
|
|
return lastAppliedTxId - prevLastAppliedTxId;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Update the count of each directory with quota in the namespace.
|
|
|
- * A directory's count is defined as the total number inodes in the tree
|
|
|
- * rooted at the directory.
|
|
|
- *
|
|
|
- * This is an update of existing state of the filesystem and does not
|
|
|
- * throw QuotaExceededException.
|
|
|
- */
|
|
|
- static void updateCountForQuota(BlockStoragePolicySuite bsps,
|
|
|
- INodeDirectory root, int threads) {
|
|
|
- threads = (threads < 1) ? 1 : threads;
|
|
|
- LOG.info("Initializing quota with " + threads + " thread(s)");
|
|
|
- long start = Time.now();
|
|
|
- QuotaCounts counts = new QuotaCounts.Builder().build();
|
|
|
- ForkJoinPool p = new ForkJoinPool(threads);
|
|
|
- RecursiveAction task = new InitQuotaTask(bsps, root.getStoragePolicyID(),
|
|
|
- root, counts);
|
|
|
- p.execute(task);
|
|
|
- task.join();
|
|
|
- p.shutdown();
|
|
|
- LOG.info("Quota initialization completed in " + (Time.now() - start) +
|
|
|
- " milliseconds\n" + counts);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * parallel initialization using fork-join.
|
|
|
- */
|
|
|
- private static class InitQuotaTask extends RecursiveAction {
|
|
|
- private final INodeDirectory dir;
|
|
|
- private final QuotaCounts counts;
|
|
|
- private final BlockStoragePolicySuite bsps;
|
|
|
- private final byte blockStoragePolicyId;
|
|
|
-
|
|
|
- public InitQuotaTask(BlockStoragePolicySuite bsps,
|
|
|
- byte blockStoragePolicyId, INodeDirectory dir, QuotaCounts counts) {
|
|
|
- this.dir = dir;
|
|
|
- this.counts = counts;
|
|
|
- this.bsps = bsps;
|
|
|
- this.blockStoragePolicyId = blockStoragePolicyId;
|
|
|
- }
|
|
|
-
|
|
|
- public void compute() {
|
|
|
- QuotaCounts myCounts = new QuotaCounts.Builder().build();
|
|
|
- dir.computeQuotaUsage4CurrentDirectory(bsps, blockStoragePolicyId,
|
|
|
- myCounts);
|
|
|
-
|
|
|
- ReadOnlyList<INode> children =
|
|
|
- dir.getChildrenList(Snapshot.CURRENT_STATE_ID);
|
|
|
-
|
|
|
- if (children.size() > 0) {
|
|
|
- List<InitQuotaTask> subtasks = new ArrayList<InitQuotaTask>();
|
|
|
- for (INode child : children) {
|
|
|
- final byte childPolicyId =
|
|
|
- child.getStoragePolicyIDForQuota(blockStoragePolicyId);
|
|
|
- if (child.isDirectory()) {
|
|
|
- subtasks.add(new InitQuotaTask(bsps, childPolicyId,
|
|
|
- child.asDirectory(), myCounts));
|
|
|
- } else {
|
|
|
- // file or symlink. count using the local counts variable
|
|
|
- myCounts.add(child.computeQuotaUsage(bsps, childPolicyId, false,
|
|
|
- Snapshot.CURRENT_STATE_ID));
|
|
|
- }
|
|
|
- }
|
|
|
- // invoke and wait for completion
|
|
|
- invokeAll(subtasks);
|
|
|
- }
|
|
|
-
|
|
|
- if (dir.isQuotaSet()) {
|
|
|
- // check if quota is violated. It indicates a software bug.
|
|
|
- final QuotaCounts q = dir.getQuotaCounts();
|
|
|
-
|
|
|
- final long nsConsumed = myCounts.getNameSpace();
|
|
|
- final long nsQuota = q.getNameSpace();
|
|
|
- if (Quota.isViolated(nsQuota, nsConsumed)) {
|
|
|
- LOG.warn("Namespace quota violation in image for "
|
|
|
- + dir.getFullPathName()
|
|
|
- + " quota = " + nsQuota + " < consumed = " + nsConsumed);
|
|
|
- }
|
|
|
-
|
|
|
- final long ssConsumed = myCounts.getStorageSpace();
|
|
|
- final long ssQuota = q.getStorageSpace();
|
|
|
- if (Quota.isViolated(ssQuota, ssConsumed)) {
|
|
|
- LOG.warn("Storagespace quota violation in image for "
|
|
|
- + dir.getFullPathName()
|
|
|
- + " quota = " + ssQuota + " < consumed = " + ssConsumed);
|
|
|
- }
|
|
|
-
|
|
|
- final EnumCounters<StorageType> tsConsumed = myCounts.getTypeSpaces();
|
|
|
- for (StorageType t : StorageType.getTypesSupportingQuota()) {
|
|
|
- final long typeSpace = tsConsumed.get(t);
|
|
|
- final long typeQuota = q.getTypeSpaces().get(t);
|
|
|
- if (Quota.isViolated(typeQuota, typeSpace)) {
|
|
|
- LOG.warn("Storage type quota violation in image for "
|
|
|
- + dir.getFullPathName()
|
|
|
- + " type = " + t.toString() + " quota = "
|
|
|
- + typeQuota + " < consumed " + typeSpace);
|
|
|
- }
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Setting quota for " + dir + "\n" + myCounts);
|
|
|
- }
|
|
|
- dir.getDirectoryWithQuotaFeature().setSpaceConsumed(nsConsumed,
|
|
|
- ssConsumed, tsConsumed);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized(counts) {
|
|
|
- counts.add(myCounts);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Load the image namespace from the given image file, verifying
|
|
|
* it against the MD5 sum stored in its associated .md5 file.
|