|
@@ -17,53 +17,97 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
|
|
|
|
|
|
|
|
+import java.io.Closeable;
|
|
import java.io.DataInput;
|
|
import java.io.DataInput;
|
|
import java.io.DataOutput;
|
|
import java.io.DataOutput;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Collection;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
import java.util.SortedMap;
|
|
import java.util.SortedMap;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
|
|
|
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
-import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
|
|
|
|
-import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
|
|
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
|
|
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
|
|
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
|
|
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
|
|
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
|
|
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
|
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
|
|
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
+import org.apache.hadoop.util.GSet;
|
|
|
|
+import org.apache.hadoop.util.LightWeightGSet;
|
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
|
|
|
-import com.google.common.base.Preconditions;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
/**
|
|
* The Cache Manager handles caching on DataNodes.
|
|
* The Cache Manager handles caching on DataNodes.
|
|
|
|
+ *
|
|
|
|
+ * This class is instantiated by the FSNamesystem when caching is enabled.
|
|
|
|
+ * It maintains the mapping of cached blocks to datanodes via processing
|
|
|
|
+ * datanode cache reports. Based on these reports and addition and removal of
|
|
|
|
+ * caching directives, we will schedule caching and uncaching work.
|
|
*/
|
|
*/
|
|
|
|
+@InterfaceAudience.LimitedPrivate({"HDFS"})
|
|
public final class CacheManager {
|
|
public final class CacheManager {
|
|
public static final Log LOG = LogFactory.getLog(CacheManager.class);
|
|
public static final Log LOG = LogFactory.getLog(CacheManager.class);
|
|
|
|
|
|
|
|
+ // TODO: add pending / underCached / schedule cached blocks stats.
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The FSNamesystem that contains this CacheManager.
|
|
|
|
+ */
|
|
|
|
+ private final FSNamesystem namesystem;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The BlockManager associated with the FSN that owns this CacheManager.
|
|
|
|
+ */
|
|
|
|
+ private final BlockManager blockManager;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Cache entries, sorted by ID.
|
|
* Cache entries, sorted by ID.
|
|
*
|
|
*
|
|
@@ -73,6 +117,12 @@ public final class CacheManager {
|
|
private final TreeMap<Long, PathBasedCacheEntry> entriesById =
|
|
private final TreeMap<Long, PathBasedCacheEntry> entriesById =
|
|
new TreeMap<Long, PathBasedCacheEntry>();
|
|
new TreeMap<Long, PathBasedCacheEntry>();
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * The entry ID to use for a new entry. Entry IDs always increase, and are
|
|
|
|
+ * never reused.
|
|
|
|
+ */
|
|
|
|
+ private long nextEntryId;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Cache entries, sorted by path
|
|
* Cache entries, sorted by path
|
|
*/
|
|
*/
|
|
@@ -85,11 +135,6 @@ public final class CacheManager {
|
|
private final TreeMap<String, CachePool> cachePools =
|
|
private final TreeMap<String, CachePool> cachePools =
|
|
new TreeMap<String, CachePool>();
|
|
new TreeMap<String, CachePool>();
|
|
|
|
|
|
- /**
|
|
|
|
- * The entry ID to use for a new entry.
|
|
|
|
- */
|
|
|
|
- private long nextEntryId;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Maximum number of cache pools to list in one operation.
|
|
* Maximum number of cache pools to list in one operation.
|
|
*/
|
|
*/
|
|
@@ -100,44 +145,129 @@ public final class CacheManager {
|
|
*/
|
|
*/
|
|
private final int maxListCacheDescriptorsResponses;
|
|
private final int maxListCacheDescriptorsResponses;
|
|
|
|
|
|
- final private FSNamesystem namesystem;
|
|
|
|
- final private FSDirectory dir;
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Interval between scans in milliseconds.
|
|
|
|
+ */
|
|
|
|
+ private final long scanIntervalMs;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Whether caching is enabled.
|
|
|
|
+ *
|
|
|
|
+ * If caching is disabled, we will not process cache reports or store
|
|
|
|
+ * information about what is cached where. We also do not start the
|
|
|
|
+ * CacheReplicationMonitor thread. This will save resources, but provide
|
|
|
|
+ * less functionality.
|
|
|
|
+ *
|
|
|
|
+ * Even when caching is disabled, we still store path-based cache
|
|
|
|
+ * information. This information is stored in the edit log and fsimage. We
|
|
|
|
+ * don't want to lose it just because a configuration setting was turned off.
|
|
|
|
+ * However, we will not act on this information if caching is disabled.
|
|
|
|
+ */
|
|
|
|
+ private final boolean enabled;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Whether the CacheManager is active.
|
|
|
|
+ *
|
|
|
|
+ * When the CacheManager is active, it tells the DataNodes what to cache
|
|
|
|
+ * and uncache. The CacheManager cannot become active if enabled = false.
|
|
|
|
+ */
|
|
|
|
+ private boolean active = false;
|
|
|
|
|
|
- CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) {
|
|
|
|
- clear();
|
|
|
|
|
|
+ /**
|
|
|
|
+ * All cached blocks.
|
|
|
|
+ */
|
|
|
|
+ private final GSet<CachedBlock, CachedBlock> cachedBlocks;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The CacheReplicationMonitor.
|
|
|
|
+ */
|
|
|
|
+ private CacheReplicationMonitor monitor;
|
|
|
|
+
|
|
|
|
+ CacheManager(FSNamesystem namesystem, Configuration conf,
|
|
|
|
+ BlockManager blockManager) {
|
|
this.namesystem = namesystem;
|
|
this.namesystem = namesystem;
|
|
- this.dir = dir;
|
|
|
|
- maxListCachePoolsResponses = conf.getInt(
|
|
|
|
|
|
+ this.blockManager = blockManager;
|
|
|
|
+ this.nextEntryId = 1;
|
|
|
|
+ this.maxListCachePoolsResponses = conf.getInt(
|
|
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
|
|
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
|
|
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
|
|
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
|
|
- maxListCacheDescriptorsResponses = conf.getInt(
|
|
|
|
|
|
+ this.maxListCacheDescriptorsResponses = conf.getInt(
|
|
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES,
|
|
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES,
|
|
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT);
|
|
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT);
|
|
|
|
+ scanIntervalMs = conf.getLong(
|
|
|
|
+ DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
|
|
|
|
+ DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
|
|
|
|
+ this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY,
|
|
|
|
+ DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
|
|
|
|
+ this.cachedBlocks = !enabled ? null :
|
|
|
|
+ new LightWeightGSet<CachedBlock, CachedBlock>(
|
|
|
|
+ LightWeightGSet.computeCapacity(0.25, "cachedBlocks"));
|
|
}
|
|
}
|
|
|
|
|
|
- synchronized void clear() {
|
|
|
|
- entriesById.clear();
|
|
|
|
- entriesByPath.clear();
|
|
|
|
- cachePools.clear();
|
|
|
|
- nextEntryId = 1;
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Activate the cache manager.
|
|
|
|
+ *
|
|
|
|
+ * When the cache manager is active, tell the datanodes where to cache files.
|
|
|
|
+ */
|
|
|
|
+ public void activate() {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
|
|
+ if (enabled && (!active)) {
|
|
|
|
+ LOG.info("Activating CacheManager. " +
|
|
|
|
+ "Starting replication monitor thread...");
|
|
|
|
+ active = true;
|
|
|
|
+ monitor = new CacheReplicationMonitor(namesystem, this,
|
|
|
|
+ scanIntervalMs);
|
|
|
|
+ monitor.start();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Returns the next entry ID to be used for a PathBasedCacheEntry
|
|
|
|
|
|
+ * Deactivate the cache manager.
|
|
|
|
+ *
|
|
|
|
+ * When the cache manager is inactive, it does not tell the datanodes where to
|
|
|
|
+ * cache files.
|
|
*/
|
|
*/
|
|
- synchronized long getNextEntryId() {
|
|
|
|
- Preconditions.checkArgument(nextEntryId != Long.MAX_VALUE);
|
|
|
|
- return nextEntryId++;
|
|
|
|
|
|
+ public void deactivate() {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
|
|
+ if (active) {
|
|
|
|
+ LOG.info("Deactivating CacheManager. " +
|
|
|
|
+ "stopping CacheReplicationMonitor thread...");
|
|
|
|
+ active = false;
|
|
|
|
+ IOUtils.closeQuietly(monitor);
|
|
|
|
+ monitor = null;
|
|
|
|
+ LOG.info("CacheReplicationMonitor thread stopped and deactivated.");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Returns the PathBasedCacheEntry corresponding to a PathBasedCacheEntry.
|
|
|
|
- *
|
|
|
|
- * @param directive Lookup directive
|
|
|
|
- * @return Corresponding PathBasedCacheEntry, or null if not present.
|
|
|
|
|
|
+ * Return true only if the cache manager is active.
|
|
|
|
+ * Must be called under the FSN read or write lock.
|
|
*/
|
|
*/
|
|
- private synchronized PathBasedCacheEntry
|
|
|
|
- findEntry(PathBasedCacheDirective directive) {
|
|
|
|
|
|
+ public boolean isActive() {
|
|
|
|
+ return active;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public TreeMap<Long, PathBasedCacheEntry> getEntriesById() {
|
|
|
|
+ assert namesystem.hasReadOrWriteLock();
|
|
|
|
+ return entriesById;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
|
|
|
|
+ assert namesystem.hasReadOrWriteLock();
|
|
|
|
+ return cachedBlocks;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private long getNextEntryId() throws IOException {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
|
|
+ if (nextEntryId == Long.MAX_VALUE) {
|
|
|
|
+ throw new IOException("No more available IDs");
|
|
|
|
+ }
|
|
|
|
+ return nextEntryId++;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private PathBasedCacheEntry findEntry(PathBasedCacheDirective directive) {
|
|
|
|
+ assert namesystem.hasReadOrWriteLock();
|
|
List<PathBasedCacheEntry> existing =
|
|
List<PathBasedCacheEntry> existing =
|
|
entriesByPath.get(directive.getPath().toUri().getPath());
|
|
entriesByPath.get(directive.getPath().toUri().getPath());
|
|
if (existing == null) {
|
|
if (existing == null) {
|
|
@@ -151,56 +281,10 @@ public final class CacheManager {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Add a new PathBasedCacheEntry, skipping any validation checks. Called
|
|
|
|
- * directly when reloading CacheManager state from FSImage.
|
|
|
|
- *
|
|
|
|
- * @throws IOException if unable to cache the entry
|
|
|
|
- */
|
|
|
|
- private void unprotectedAddEntry(PathBasedCacheEntry entry)
|
|
|
|
- throws IOException {
|
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
|
- // Add it to the various maps
|
|
|
|
- entriesById.put(entry.getEntryId(), entry);
|
|
|
|
- String path = entry.getPath();
|
|
|
|
- List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
|
|
|
|
- if (entryList == null) {
|
|
|
|
- entryList = new ArrayList<PathBasedCacheEntry>(1);
|
|
|
|
- entriesByPath.put(path, entryList);
|
|
|
|
- }
|
|
|
|
- entryList.add(entry);
|
|
|
|
- // Set the path as cached in the namesystem
|
|
|
|
- try {
|
|
|
|
- INode node = dir.getINode(entry.getPath());
|
|
|
|
- if (node != null && node.isFile()) {
|
|
|
|
- INodeFile file = node.asFile();
|
|
|
|
- // TODO: adjustable cache replication factor
|
|
|
|
- namesystem.setCacheReplicationInt(entry.getPath(),
|
|
|
|
- file.getBlockReplication());
|
|
|
|
- } else {
|
|
|
|
- LOG.warn("Path " + entry.getPath() + " is not a file");
|
|
|
|
- }
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- LOG.info("unprotectedAddEntry " + entry +": failed to cache file: " +
|
|
|
|
- ioe.getClass().getName() +": " + ioe.getMessage());
|
|
|
|
- throw ioe;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Add a new PathBasedCacheDirective if valid, returning a corresponding
|
|
|
|
- * PathBasedCacheDescriptor to the user.
|
|
|
|
- *
|
|
|
|
- * @param directive Directive describing the cache entry being added
|
|
|
|
- * @param pc Permission checker used to validate that the calling user has
|
|
|
|
- * access to the destination cache pool
|
|
|
|
- * @return Corresponding PathBasedCacheDescriptor for the new cache entry
|
|
|
|
- * @throws IOException if the directive is invalid or was otherwise
|
|
|
|
- * unsuccessful
|
|
|
|
- */
|
|
|
|
- public synchronized PathBasedCacheDescriptor addDirective(
|
|
|
|
|
|
+ public PathBasedCacheDescriptor addDirective(
|
|
PathBasedCacheDirective directive, FSPermissionChecker pc)
|
|
PathBasedCacheDirective directive, FSPermissionChecker pc)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
CachePool pool = cachePools.get(directive.getPool());
|
|
CachePool pool = cachePools.get(directive.getPool());
|
|
if (pool == null) {
|
|
if (pool == null) {
|
|
LOG.info("addDirective " + directive + ": pool not found.");
|
|
LOG.info("addDirective " + directive + ": pool not found.");
|
|
@@ -225,47 +309,37 @@ public final class CacheManager {
|
|
"existing directive " + existing + " in this pool.");
|
|
"existing directive " + existing + " in this pool.");
|
|
return existing.getDescriptor();
|
|
return existing.getDescriptor();
|
|
}
|
|
}
|
|
-
|
|
|
|
- // Success!
|
|
|
|
- PathBasedCacheDescriptor d = unprotectedAddDirective(directive);
|
|
|
|
- LOG.info("addDirective " + directive + ": added cache directive "
|
|
|
|
- + directive);
|
|
|
|
- return d;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Assigns a new entry ID to a validated PathBasedCacheDirective and adds
|
|
|
|
- * it to the CacheManager. Called directly when replaying the edit log.
|
|
|
|
- *
|
|
|
|
- * @param directive Directive being added
|
|
|
|
- * @return PathBasedCacheDescriptor for the directive
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- PathBasedCacheDescriptor unprotectedAddDirective(
|
|
|
|
- PathBasedCacheDirective directive) throws IOException {
|
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
|
- CachePool pool = cachePools.get(directive.getPool());
|
|
|
|
// Add a new entry with the next available ID.
|
|
// Add a new entry with the next available ID.
|
|
PathBasedCacheEntry entry;
|
|
PathBasedCacheEntry entry;
|
|
- entry = new PathBasedCacheEntry(getNextEntryId(),
|
|
|
|
- directive.getPath().toUri().getPath(),
|
|
|
|
- directive.getReplication(), pool);
|
|
|
|
-
|
|
|
|
- unprotectedAddEntry(entry);
|
|
|
|
|
|
+ try {
|
|
|
|
+ entry = new PathBasedCacheEntry(getNextEntryId(),
|
|
|
|
+ directive.getPath().toUri().getPath(),
|
|
|
|
+ directive.getReplication(), pool);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ throw new UnexpectedAddPathBasedCacheDirectiveException(directive);
|
|
|
|
+ }
|
|
|
|
+ LOG.info("addDirective " + directive + ": added cache directive "
|
|
|
|
+ + directive);
|
|
|
|
|
|
|
|
+ // Success!
|
|
|
|
+ // First, add it to the various maps
|
|
|
|
+ entriesById.put(entry.getEntryId(), entry);
|
|
|
|
+ String path = directive.getPath().toUri().getPath();
|
|
|
|
+ List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
|
|
|
|
+ if (entryList == null) {
|
|
|
|
+ entryList = new ArrayList<PathBasedCacheEntry>(1);
|
|
|
|
+ entriesByPath.put(path, entryList);
|
|
|
|
+ }
|
|
|
|
+ entryList.add(entry);
|
|
|
|
+ if (monitor != null) {
|
|
|
|
+ monitor.kick();
|
|
|
|
+ }
|
|
return entry.getDescriptor();
|
|
return entry.getDescriptor();
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Remove the PathBasedCacheEntry corresponding to a descriptor ID from
|
|
|
|
- * the CacheManager.
|
|
|
|
- *
|
|
|
|
- * @param id of the PathBasedCacheDescriptor
|
|
|
|
- * @param pc Permissions checker used to validated the request
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- public synchronized void removeDescriptor(long id, FSPermissionChecker pc)
|
|
|
|
|
|
+ public void removeDescriptor(long id, FSPermissionChecker pc)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
// Check for invalid IDs.
|
|
// Check for invalid IDs.
|
|
if (id <= 0) {
|
|
if (id <= 0) {
|
|
LOG.info("removeDescriptor " + id + ": invalid non-positive " +
|
|
LOG.info("removeDescriptor " + id + ": invalid non-positive " +
|
|
@@ -290,20 +364,6 @@ public final class CacheManager {
|
|
throw new RemovePermissionDeniedException(id);
|
|
throw new RemovePermissionDeniedException(id);
|
|
}
|
|
}
|
|
|
|
|
|
- unprotectedRemoveDescriptor(id);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Unchecked internal method used to remove a PathBasedCacheEntry from the
|
|
|
|
- * CacheManager. Called directly when replaying the edit log.
|
|
|
|
- *
|
|
|
|
- * @param id of the PathBasedCacheDescriptor corresponding to the entry that
|
|
|
|
- * is being removed
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- void unprotectedRemoveDescriptor(long id) throws IOException {
|
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
|
- PathBasedCacheEntry existing = entriesById.get(id);
|
|
|
|
// Remove the corresponding entry in entriesByPath.
|
|
// Remove the corresponding entry in entriesByPath.
|
|
String path = existing.getDescriptor().getPath().toUri().getPath();
|
|
String path = existing.getDescriptor().getPath().toUri().getPath();
|
|
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
|
|
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
|
|
@@ -314,26 +374,16 @@ public final class CacheManager {
|
|
entriesByPath.remove(path);
|
|
entriesByPath.remove(path);
|
|
}
|
|
}
|
|
entriesById.remove(id);
|
|
entriesById.remove(id);
|
|
-
|
|
|
|
- // Set the path as uncached in the namesystem
|
|
|
|
- try {
|
|
|
|
- INode node = dir.getINode(existing.getDescriptor().getPath().toUri().
|
|
|
|
- getPath());
|
|
|
|
- if (node != null && node.isFile()) {
|
|
|
|
- namesystem.setCacheReplicationInt(existing.getDescriptor().getPath().
|
|
|
|
- toUri().getPath(), (short) 0);
|
|
|
|
- }
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("removeDescriptor " + id + ": failure while setting cache"
|
|
|
|
- + " replication factor", e);
|
|
|
|
- throw e;
|
|
|
|
|
|
+ if (monitor != null) {
|
|
|
|
+ monitor.kick();
|
|
}
|
|
}
|
|
LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
|
|
LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
|
|
}
|
|
}
|
|
|
|
|
|
- public synchronized BatchedListEntries<PathBasedCacheDescriptor>
|
|
|
|
|
|
+ public BatchedListEntries<PathBasedCacheDescriptor>
|
|
listPathBasedCacheDescriptors(long prevId, String filterPool,
|
|
listPathBasedCacheDescriptors(long prevId, String filterPool,
|
|
String filterPath, FSPermissionChecker pc) throws IOException {
|
|
String filterPath, FSPermissionChecker pc) throws IOException {
|
|
|
|
+ assert namesystem.hasReadOrWriteLock();
|
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
if (filterPath != null) {
|
|
if (filterPath != null) {
|
|
if (!DFSUtil.isValidName(filterPath)) {
|
|
if (!DFSUtil.isValidName(filterPath)) {
|
|
@@ -370,12 +420,13 @@ public final class CacheManager {
|
|
* Create a cache pool.
|
|
* Create a cache pool.
|
|
*
|
|
*
|
|
* Only the superuser should be able to call this function.
|
|
* Only the superuser should be able to call this function.
|
|
- *
|
|
|
|
- * @param info The info for the cache pool to create.
|
|
|
|
- * @return the created CachePool
|
|
|
|
|
|
+ *
|
|
|
|
+ * @param info The info for the cache pool to create.
|
|
|
|
+ * @return Information about the cache pool we created.
|
|
*/
|
|
*/
|
|
- public synchronized CachePoolInfo addCachePool(CachePoolInfo info)
|
|
|
|
|
|
+ public CachePoolInfo addCachePool(CachePoolInfo info)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
CachePoolInfo.validate(info);
|
|
CachePoolInfo.validate(info);
|
|
String poolName = info.getPoolName();
|
|
String poolName = info.getPoolName();
|
|
CachePool pool = cachePools.get(poolName);
|
|
CachePool pool = cachePools.get(poolName);
|
|
@@ -384,20 +435,8 @@ public final class CacheManager {
|
|
}
|
|
}
|
|
pool = CachePool.createFromInfoAndDefaults(info);
|
|
pool = CachePool.createFromInfoAndDefaults(info);
|
|
cachePools.put(pool.getPoolName(), pool);
|
|
cachePools.put(pool.getPoolName(), pool);
|
|
- return pool.getInfo(true);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Internal unchecked method used to add a CachePool. Called directly when
|
|
|
|
- * reloading CacheManager state from the FSImage or edit log.
|
|
|
|
- *
|
|
|
|
- * @param pool to be added
|
|
|
|
- */
|
|
|
|
- void unprotectedAddCachePool(CachePoolInfo info) {
|
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
|
- CachePool pool = CachePool.createFromInfo(info);
|
|
|
|
- cachePools.put(pool.getPoolName(), pool);
|
|
|
|
LOG.info("created new cache pool " + pool);
|
|
LOG.info("created new cache pool " + pool);
|
|
|
|
+ return pool.getInfo(true);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -408,8 +447,9 @@ public final class CacheManager {
|
|
* @param info
|
|
* @param info
|
|
* The info for the cache pool to modify.
|
|
* The info for the cache pool to modify.
|
|
*/
|
|
*/
|
|
- public synchronized void modifyCachePool(CachePoolInfo info)
|
|
|
|
|
|
+ public void modifyCachePool(CachePoolInfo info)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
CachePoolInfo.validate(info);
|
|
CachePoolInfo.validate(info);
|
|
String poolName = info.getPoolName();
|
|
String poolName = info.getPoolName();
|
|
CachePool pool = cachePools.get(poolName);
|
|
CachePool pool = cachePools.get(poolName);
|
|
@@ -455,8 +495,9 @@ public final class CacheManager {
|
|
* @param poolName
|
|
* @param poolName
|
|
* The name for the cache pool to remove.
|
|
* The name for the cache pool to remove.
|
|
*/
|
|
*/
|
|
- public synchronized void removeCachePool(String poolName)
|
|
|
|
|
|
+ public void removeCachePool(String poolName)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
CachePoolInfo.validateName(poolName);
|
|
CachePoolInfo.validateName(poolName);
|
|
CachePool pool = cachePools.remove(poolName);
|
|
CachePool pool = cachePools.remove(poolName);
|
|
if (pool == null) {
|
|
if (pool == null) {
|
|
@@ -475,10 +516,14 @@ public final class CacheManager {
|
|
iter.remove();
|
|
iter.remove();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if (monitor != null) {
|
|
|
|
+ monitor.kick();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- public synchronized BatchedListEntries<CachePoolInfo>
|
|
|
|
|
|
+ public BatchedListEntries<CachePoolInfo>
|
|
listCachePools(FSPermissionChecker pc, String prevKey) {
|
|
listCachePools(FSPermissionChecker pc, String prevKey) {
|
|
|
|
+ assert namesystem.hasReadOrWriteLock();
|
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
|
|
ArrayList<CachePoolInfo> results =
|
|
ArrayList<CachePoolInfo> results =
|
|
new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
|
|
@@ -497,9 +542,104 @@ public final class CacheManager {
|
|
return new BatchedListEntries<CachePoolInfo>(results, false);
|
|
return new BatchedListEntries<CachePoolInfo>(results, false);
|
|
}
|
|
}
|
|
|
|
|
|
- /*
|
|
|
|
- * FSImage related serialization and deserialization code
|
|
|
|
- */
|
|
|
|
|
|
+ public void setCachedLocations(LocatedBlock block) {
|
|
|
|
+ if (!enabled) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ CachedBlock cachedBlock =
|
|
|
|
+ new CachedBlock(block.getBlock().getBlockId(),
|
|
|
|
+ (short)0, false);
|
|
|
|
+ cachedBlock = cachedBlocks.get(cachedBlock);
|
|
|
|
+ if (cachedBlock == null) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
|
|
|
|
+ for (DatanodeDescriptor datanode : datanodes) {
|
|
|
|
+ block.addCachedLoc(datanode);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public final void processCacheReport(final DatanodeID datanodeID,
|
|
|
|
+ final BlockListAsLongs report) throws IOException {
|
|
|
|
+ if (!enabled) {
|
|
|
|
+ LOG.info("Ignoring cache report from " + datanodeID +
|
|
|
|
+ " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " +
|
|
|
|
+ "number of blocks: " + report.getNumberOfBlocks());
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ namesystem.writeLock();
|
|
|
|
+ final long startTime = Time.monotonicNow();
|
|
|
|
+ final long endTime;
|
|
|
|
+ try {
|
|
|
|
+ final DatanodeDescriptor datanode =
|
|
|
|
+ blockManager.getDatanodeManager().getDatanode(datanodeID);
|
|
|
|
+ if (datanode == null || !datanode.isAlive) {
|
|
|
|
+ throw new IOException(
|
|
|
|
+ "processCacheReport from dead or unregistered datanode: " + datanode);
|
|
|
|
+ }
|
|
|
|
+ processCacheReportImpl(datanode, report);
|
|
|
|
+ } finally {
|
|
|
|
+ endTime = Time.monotonicNow();
|
|
|
|
+ namesystem.writeUnlock();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Log the block report processing stats from Namenode perspective
|
|
|
|
+ final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
|
|
|
|
+ if (metrics != null) {
|
|
|
|
+ metrics.addCacheBlockReport((int) (endTime - startTime));
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Processed cache report from "
|
|
|
|
+ + datanodeID + ", blocks: " + report.getNumberOfBlocks()
|
|
|
|
+ + ", processing time: " + (endTime - startTime) + " msecs");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void processCacheReportImpl(final DatanodeDescriptor datanode,
|
|
|
|
+ final BlockListAsLongs report) {
|
|
|
|
+ CachedBlocksList cached = datanode.getCached();
|
|
|
|
+ cached.clear();
|
|
|
|
+ BlockReportIterator itBR = report.getBlockReportIterator();
|
|
|
|
+ while (itBR.hasNext()) {
|
|
|
|
+ Block block = itBR.next();
|
|
|
|
+ ReplicaState iState = itBR.getCurrentReplicaState();
|
|
|
|
+ if (iState != ReplicaState.FINALIZED) {
|
|
|
|
+ LOG.error("Cached block report contained unfinalized block " + block);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ BlockInfo blockInfo = blockManager.getStoredBlock(block);
|
|
|
|
+ if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
|
|
|
|
+ // The NameNode will eventually remove or update the out-of-date block.
|
|
|
|
+ // Until then, we pretend that it isn't cached.
|
|
|
|
+ LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
|
|
|
|
+ block + ": expected genstamp " + blockInfo.getGenerationStamp());
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ Collection<DatanodeDescriptor> corruptReplicas =
|
|
|
|
+ blockManager.getCorruptReplicas(blockInfo);
|
|
|
|
+ if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) {
|
|
|
|
+ // The NameNode will eventually remove or update the corrupt block.
|
|
|
|
+ // Until then, we pretend that it isn't cached.
|
|
|
|
+ LOG.warn("Ignoring cached replica on " + datanode + " of " + block +
|
|
|
|
+ " because it is corrupt.");
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ CachedBlock cachedBlock =
|
|
|
|
+ new CachedBlock(block.getBlockId(), (short)0, false);
|
|
|
|
+ CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
|
|
|
|
+ // Use the existing CachedBlock if it's present; otherwise,
|
|
|
|
+ // insert a new one.
|
|
|
|
+ if (prevCachedBlock != null) {
|
|
|
|
+ cachedBlock = prevCachedBlock;
|
|
|
|
+ } else {
|
|
|
|
+ cachedBlocks.put(cachedBlock);
|
|
|
|
+ }
|
|
|
|
+ if (!cachedBlock.isPresent(datanode.getCached())) {
|
|
|
|
+ datanode.getCached().add(cachedBlock);
|
|
|
|
+ }
|
|
|
|
+ if (cachedBlock.isPresent(datanode.getPendingCached())) {
|
|
|
|
+ datanode.getPendingCached().remove(cachedBlock);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Saves the current state of the CacheManager to the DataOutput. Used
|
|
* Saves the current state of the CacheManager to the DataOutput. Used
|
|
@@ -508,7 +648,7 @@ public final class CacheManager {
|
|
* @param sdPath path of the storage directory
|
|
* @param sdPath path of the storage directory
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public synchronized void saveState(DataOutput out, String sdPath)
|
|
|
|
|
|
+ public void saveState(DataOutput out, String sdPath)
|
|
throws IOException {
|
|
throws IOException {
|
|
out.writeLong(nextEntryId);
|
|
out.writeLong(nextEntryId);
|
|
savePools(out, sdPath);
|
|
savePools(out, sdPath);
|
|
@@ -521,7 +661,8 @@ public final class CacheManager {
|
|
* @param in DataInput from which to restore state
|
|
* @param in DataInput from which to restore state
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public synchronized void loadState(DataInput in) throws IOException {
|
|
|
|
|
|
+ public void loadState(DataInput in) throws IOException {
|
|
|
|
+ assert namesystem.hasWriteLock();
|
|
nextEntryId = in.readLong();
|
|
nextEntryId = in.readLong();
|
|
// pools need to be loaded first since entries point to their parent pool
|
|
// pools need to be loaded first since entries point to their parent pool
|
|
loadPools(in);
|
|
loadPools(in);
|
|
@@ -531,7 +672,7 @@ public final class CacheManager {
|
|
/**
|
|
/**
|
|
* Save cache pools to fsimage
|
|
* Save cache pools to fsimage
|
|
*/
|
|
*/
|
|
- private synchronized void savePools(DataOutput out,
|
|
|
|
|
|
+ private void savePools(DataOutput out,
|
|
String sdPath) throws IOException {
|
|
String sdPath) throws IOException {
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
Step step = new Step(StepType.CACHE_POOLS, sdPath);
|
|
Step step = new Step(StepType.CACHE_POOLS, sdPath);
|
|
@@ -549,7 +690,7 @@ public final class CacheManager {
|
|
/*
|
|
/*
|
|
* Save cache entries to fsimage
|
|
* Save cache entries to fsimage
|
|
*/
|
|
*/
|
|
- private synchronized void saveEntries(DataOutput out, String sdPath)
|
|
|
|
|
|
+ private void saveEntries(DataOutput out, String sdPath)
|
|
throws IOException {
|
|
throws IOException {
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
|
|
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
|
|
@@ -560,6 +701,7 @@ public final class CacheManager {
|
|
for (PathBasedCacheEntry entry: entriesById.values()) {
|
|
for (PathBasedCacheEntry entry: entriesById.values()) {
|
|
out.writeLong(entry.getEntryId());
|
|
out.writeLong(entry.getEntryId());
|
|
Text.writeString(out, entry.getPath());
|
|
Text.writeString(out, entry.getPath());
|
|
|
|
+ out.writeShort(entry.getReplication());
|
|
Text.writeString(out, entry.getPool().getPoolName());
|
|
Text.writeString(out, entry.getPool().getPoolName());
|
|
counter.increment();
|
|
counter.increment();
|
|
}
|
|
}
|
|
@@ -569,7 +711,7 @@ public final class CacheManager {
|
|
/**
|
|
/**
|
|
* Load cache pools from fsimage
|
|
* Load cache pools from fsimage
|
|
*/
|
|
*/
|
|
- private synchronized void loadPools(DataInput in)
|
|
|
|
|
|
+ private void loadPools(DataInput in)
|
|
throws IOException {
|
|
throws IOException {
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
Step step = new Step(StepType.CACHE_POOLS);
|
|
Step step = new Step(StepType.CACHE_POOLS);
|
|
@@ -578,8 +720,7 @@ public final class CacheManager {
|
|
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
|
|
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
|
|
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
|
|
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
|
|
for (int i = 0; i < numberOfPools; i++) {
|
|
for (int i = 0; i < numberOfPools; i++) {
|
|
- CachePoolInfo info = CachePoolInfo.readFrom(in);
|
|
|
|
- unprotectedAddCachePool(info);
|
|
|
|
|
|
+ addCachePool(CachePoolInfo.readFrom(in));
|
|
counter.increment();
|
|
counter.increment();
|
|
}
|
|
}
|
|
prog.endStep(Phase.LOADING_FSIMAGE, step);
|
|
prog.endStep(Phase.LOADING_FSIMAGE, step);
|
|
@@ -588,7 +729,7 @@ public final class CacheManager {
|
|
/**
|
|
/**
|
|
* Load cache entries from the fsimage
|
|
* Load cache entries from the fsimage
|
|
*/
|
|
*/
|
|
- private synchronized void loadEntries(DataInput in) throws IOException {
|
|
|
|
|
|
+ private void loadEntries(DataInput in) throws IOException {
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
StartupProgress prog = NameNode.getStartupProgress();
|
|
Step step = new Step(StepType.CACHE_ENTRIES);
|
|
Step step = new Step(StepType.CACHE_ENTRIES);
|
|
prog.beginStep(Phase.LOADING_FSIMAGE, step);
|
|
prog.beginStep(Phase.LOADING_FSIMAGE, step);
|
|
@@ -602,12 +743,24 @@ public final class CacheManager {
|
|
String poolName = Text.readString(in);
|
|
String poolName = Text.readString(in);
|
|
// Get pool reference by looking it up in the map
|
|
// Get pool reference by looking it up in the map
|
|
CachePool pool = cachePools.get(poolName);
|
|
CachePool pool = cachePools.get(poolName);
|
|
|
|
+ if (pool != null) {
|
|
|
|
+ throw new IOException("Entry refers to pool " + poolName +
|
|
|
|
+ ", which does not exist.");
|
|
|
|
+ }
|
|
PathBasedCacheEntry entry =
|
|
PathBasedCacheEntry entry =
|
|
- new PathBasedCacheEntry(entryId, path, replication, pool);
|
|
|
|
- unprotectedAddEntry(entry);
|
|
|
|
|
|
+ new PathBasedCacheEntry(entryId, path, replication, pool);
|
|
|
|
+ if (entriesById.put(entry.getEntryId(), entry) != null) {
|
|
|
|
+ throw new IOException("An entry with ID " + entry.getEntryId() +
|
|
|
|
+ " already exists");
|
|
|
|
+ }
|
|
|
|
+ List<PathBasedCacheEntry> entries = entriesByPath.get(entry.getPath());
|
|
|
|
+ if (entries == null) {
|
|
|
|
+ entries = new LinkedList<PathBasedCacheEntry>();
|
|
|
|
+ entriesByPath.put(entry.getPath(), entries);
|
|
|
|
+ }
|
|
|
|
+ entries.add(entry);
|
|
counter.increment();
|
|
counter.increment();
|
|
}
|
|
}
|
|
prog.endStep(Phase.LOADING_FSIMAGE, step);
|
|
prog.endStep(Phase.LOADING_FSIMAGE, step);
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|