|
@@ -17,46 +17,21 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.mover;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.net.URI;
|
|
|
-import java.text.DateFormat;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.EnumMap;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.LinkedList;
|
|
|
-import java.util.List;
|
|
|
-
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.Configured;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
|
|
-import org.apache.hadoop.hdfs.DFSClient;
|
|
|
-import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
-import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
-import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
-import org.apache.hadoop.hdfs.StorageType;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
|
|
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
|
-import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
|
|
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.*;
|
|
|
+import org.apache.hadoop.hdfs.protocol.*;
|
|
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
|
|
|
-import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
|
|
|
-import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode;
|
|
|
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.*;
|
|
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
|
|
|
-import org.apache.hadoop.hdfs.server.balancer.Dispatcher.PendingMove;
|
|
|
-import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
|
|
|
-import org.apache.hadoop.hdfs.server.balancer.Dispatcher.StorageGroupMap;
|
|
|
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
|
|
import org.apache.hadoop.hdfs.server.balancer.Matcher;
|
|
|
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.INode;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
@@ -66,6 +41,11 @@ import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
+import java.text.DateFormat;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
@InterfaceAudience.Private
|
|
|
public class Mover {
|
|
|
static final Log LOG = LogFactory.getLog(Mover.class);
|
|
@@ -173,14 +153,67 @@ public class Mover {
|
|
|
return max;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * convert a snapshot path to non-snapshot path. E.g.,
|
|
|
+ * /foo/.snapshot/snapshot-name/bar --> /foo/bar
|
|
|
+ */
|
|
|
+ private static String convertSnapshotPath(String[] pathComponents) {
|
|
|
+ StringBuilder sb = new StringBuilder(Path.SEPARATOR);
|
|
|
+ for (int i = 0; i < pathComponents.length; i++) {
|
|
|
+ if (pathComponents[i].equals(HdfsConstants.DOT_SNAPSHOT_DIR)) {
|
|
|
+ i++;
|
|
|
+ } else {
|
|
|
+ sb.append(pathComponents[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
private class Processor {
|
|
|
private final DFSClient dfs;
|
|
|
-
|
|
|
+ private final List<String> snapshottableDirs = new ArrayList<String>();
|
|
|
+
|
|
|
private Processor() {
|
|
|
dfs = dispatcher.getDistributedFileSystem().getClient();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private void getSnapshottableDirs() {
|
|
|
+ SnapshottableDirectoryStatus[] dirs = null;
|
|
|
+ try {
|
|
|
+ dirs = dfs.getSnapshottableDirListing();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Failed to get snapshottable directories."
|
|
|
+ + " Ignore and continue.", e);
|
|
|
+ }
|
|
|
+ if (dirs != null) {
|
|
|
+ for (SnapshottableDirectoryStatus dir : dirs) {
|
|
|
+ snapshottableDirs.add(dir.getFullPath().toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return true if the given path is a snapshot path and the corresponding
|
|
|
+ * INode is still in the current fsdirectory.
|
|
|
+ */
|
|
|
+ private boolean isSnapshotPathInCurrent(String path) throws IOException {
|
|
|
+ // if the parent path contains "/.snapshot/", this is a snapshot path
|
|
|
+ if (path.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
|
|
|
+ String[] pathComponents = INode.getPathNames(path);
|
|
|
+ if (HdfsConstants.DOT_SNAPSHOT_DIR
|
|
|
+ .equals(pathComponents[pathComponents.length - 2])) {
|
|
|
+ // this is a path for a specific snapshot (e.g., /foo/.snapshot/s1)
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ String nonSnapshotPath = convertSnapshotPath(pathComponents);
|
|
|
+ return dfs.getFileInfo(nonSnapshotPath) != null;
|
|
|
+ } else {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void processNamespace() {
|
|
|
+ getSnapshottableDirs();
|
|
|
try {
|
|
|
processDirRecursively("", dfs.getFileInfo("/"));
|
|
|
} catch (IOException e) {
|
|
@@ -188,37 +221,57 @@ public class Mover {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void processChildrenList(String fullPath) {
|
|
|
+ for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
|
|
|
+ final DirectoryListing children;
|
|
|
+ try {
|
|
|
+ children = dfs.listPaths(fullPath, lastReturnedName, true);
|
|
|
+ } catch(IOException e) {
|
|
|
+ LOG.warn("Failed to list directory " + fullPath
|
|
|
+ + ". Ignore the directory and continue.", e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (children == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ for (HdfsFileStatus child : children.getPartialListing()) {
|
|
|
+ processDirRecursively(fullPath, child);
|
|
|
+ }
|
|
|
+ if (!children.hasMore()) {
|
|
|
+ lastReturnedName = children.getLastName();
|
|
|
+ } else {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void processDirRecursively(String parent, HdfsFileStatus status) {
|
|
|
+ String fullPath = status.getFullName(parent);
|
|
|
if (status.isSymlink()) {
|
|
|
return; //ignore symlinks
|
|
|
} else if (status.isDir()) {
|
|
|
- String dir = status.getFullName(parent);
|
|
|
- if (!dir.endsWith(Path.SEPARATOR)) {
|
|
|
- dir = dir + Path.SEPARATOR;
|
|
|
+ if (!fullPath.endsWith(Path.SEPARATOR)) {
|
|
|
+ fullPath = fullPath + Path.SEPARATOR;
|
|
|
}
|
|
|
|
|
|
- for(byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
|
|
|
- final DirectoryListing children;
|
|
|
- try {
|
|
|
- children = dfs.listPaths(dir, lastReturnedName, true);
|
|
|
- } catch(IOException e) {
|
|
|
- LOG.warn("Failed to list directory " + dir
|
|
|
- + ". Ignore the directory and continue.", e);
|
|
|
- return;
|
|
|
- }
|
|
|
- if (children == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- for (HdfsFileStatus child : children.getPartialListing()) {
|
|
|
- processDirRecursively(dir, child);
|
|
|
- }
|
|
|
- if (!children.hasMore()) {
|
|
|
- lastReturnedName = children.getLastName();
|
|
|
- } else {
|
|
|
+ processChildrenList(fullPath);
|
|
|
+ // process snapshots if this is a snapshottable directory
|
|
|
+ if (snapshottableDirs.contains(fullPath)) {
|
|
|
+ final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
|
|
|
+ processChildrenList(dirSnapshot);
|
|
|
+ }
|
|
|
+ } else { // file
|
|
|
+ try {
|
|
|
+ if (isSnapshotPathInCurrent(fullPath)) {
|
|
|
+ // the full path is a snapshot path but it is also included in the
|
|
|
+ // current directory tree, thus ignore it.
|
|
|
return;
|
|
|
}
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Failed to check the status of " + parent
|
|
|
+ + ". Ignore it and continue.", e);
|
|
|
+ return;
|
|
|
}
|
|
|
- } else { // file
|
|
|
processFile(parent, (HdfsLocatedFileStatus)status);
|
|
|
}
|
|
|
}
|