|
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FsShell;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.AccessControlException;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
@@ -92,6 +93,7 @@ public class DistCp implements Tool {
|
|
|
"\n-f <urilist_uri> Use list at <urilist_uri> as src list" +
|
|
|
"\n-filelimit <n> Limit the total number of files to be <= n" +
|
|
|
"\n-sizelimit <n> Limit the total size to be <= n bytes" +
|
|
|
+ "\n-delete Delete the files existing in the dst but not in src" +
|
|
|
|
|
|
"\n\nNOTE 1: if -overwrite or -update are set, each source URI is " +
|
|
|
"\n interpreted as an isomorphic update to an existing directory." +
|
|
@@ -114,6 +116,7 @@ public class DistCp implements Tool {
|
|
|
|
|
|
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
|
|
|
static enum Options {
|
|
|
+ DELETE("-delete", NAME + ".delete"),
|
|
|
FILE_LIMIT("-filelimit", NAME + ".limit.file"),
|
|
|
SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
|
|
|
IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
|
|
@@ -793,9 +796,17 @@ public class DistCp implements Tool {
|
|
|
+ (dst == null ? "dst path" : "src"));
|
|
|
}
|
|
|
// incompatible command-line flags
|
|
|
- if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
|
|
|
+ final boolean isOverwrite = flags.contains(Options.OVERWRITE);
|
|
|
+ final boolean isUpdate = flags.contains(Options.UPDATE);
|
|
|
+ final boolean isDelete = flags.contains(Options.DELETE);
|
|
|
+ if (isOverwrite && isUpdate) {
|
|
|
throw new IllegalArgumentException("Conflicting overwrite policies");
|
|
|
}
|
|
|
+ if (isDelete && !isOverwrite && !isUpdate) {
|
|
|
+ throw new IllegalArgumentException(Options.DELETE.cmd
|
|
|
+ + " must be specified with " + Options.OVERWRITE + " or "
|
|
|
+ + Options.UPDATE + ".");
|
|
|
+ }
|
|
|
return new Arguments(srcs, dst, log, flags, presevedAttributes,
|
|
|
filelimit, sizelimit);
|
|
|
}
|
|
@@ -1087,15 +1098,27 @@ public class DistCp implements Tool {
|
|
|
checkAndClose(dir_writer);
|
|
|
}
|
|
|
|
|
|
+ FileStatus dststatus = null;
|
|
|
+ try {
|
|
|
+ dststatus = dstfs.getFileStatus(args.dst);
|
|
|
+ } catch(FileNotFoundException fnfe) {
|
|
|
+ LOG.info(args.dst + " does not exist.");
|
|
|
+ }
|
|
|
+
|
|
|
// create dest path dir if copying > 1 file
|
|
|
- if (!dstfs.exists(args.dst)) {
|
|
|
+ if (dststatus == null) {
|
|
|
if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
|
|
|
throw new IOException("Failed to create" + args.dst);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ final Path sorted = new Path(jobDirectory, "_distcp_sorted");
|
|
|
+ checkDuplication(jobfs, dstfilelist, sorted, conf);
|
|
|
|
|
|
- checkDuplication(jobfs, dstfilelist,
|
|
|
- new Path(jobDirectory, "_distcp_sorted"), conf);
|
|
|
+ if (dststatus != null && args.flags.contains(Options.DELETE)) {
|
|
|
+ deleteNonexisting(dstfs, dststatus, sorted,
|
|
|
+ jobfs, jobDirectory, jobConf, conf);
|
|
|
+ }
|
|
|
|
|
|
Path tmpDir = new Path(
|
|
|
(dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
|
|
@@ -1121,7 +1144,106 @@ public class DistCp implements Tool {
|
|
|
}
|
|
|
return src.getLen() == dst.getLen();
|
|
|
}
|
|
|
+
|
|
|
+ /** Delete the dst files/dirs which do not exist in src */
|
|
|
+ static private void deleteNonexisting(
|
|
|
+ FileSystem dstfs, FileStatus dstroot, Path dstsorted,
|
|
|
+ FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
|
|
|
+ ) throws IOException {
|
|
|
+ if (!dstroot.isDir()) {
|
|
|
+ throw new IOException("dst must be a directory when option "
|
|
|
+ + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
|
|
|
+ + ") is not a directory.");
|
|
|
+ }
|
|
|
+
|
|
|
+ //write dst lsr results
|
|
|
+ final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
|
|
|
+ final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
|
|
|
+ dstlsr, Text.class, FileStatus.class,
|
|
|
+ SequenceFile.CompressionType.NONE);
|
|
|
+ try {
|
|
|
+ //do lsr to get all file statuses in dstroot
|
|
|
+ final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
|
|
|
+ for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
|
|
|
+ final FileStatus status = lsrstack.pop();
|
|
|
+ if (status.isDir()) {
|
|
|
+ for(FileStatus child : dstfs.listStatus(status.getPath())) {
|
|
|
+ String relative = makeRelative(dstroot.getPath(), child.getPath());
|
|
|
+ writer.append(new Text(relative), child);
|
|
|
+ lsrstack.push(child);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ checkAndClose(writer);
|
|
|
+ }
|
|
|
+
|
|
|
+ //sort lsr results
|
|
|
+ final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
|
|
|
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
|
|
|
+ new Text.Comparator(), Text.class, FileStatus.class, jobconf);
|
|
|
+ sorter.sort(dstlsr, sortedlsr);
|
|
|
|
|
|
+ //compare lsr list and dst list
|
|
|
+ SequenceFile.Reader lsrin = null;
|
|
|
+ SequenceFile.Reader dstin = null;
|
|
|
+ try {
|
|
|
+ lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
|
|
|
+ dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
|
|
|
+
|
|
|
+ //compare sorted lsr list and sorted dst list
|
|
|
+ final Text lsrpath = new Text();
|
|
|
+ final FileStatus lsrstatus = new FileStatus();
|
|
|
+ final Text dstpath = new Text();
|
|
|
+ final Text dstfrom = new Text();
|
|
|
+ final FsShell shell = new FsShell(conf);
|
|
|
+ final String[] shellargs = {"-rmr", null};
|
|
|
+
|
|
|
+ boolean hasnext = dstin.next(dstpath, dstfrom);
|
|
|
+ for(; lsrin.next(lsrpath, lsrstatus); ) {
|
|
|
+ int dst_cmp_lsr = dstpath.compareTo(lsrpath);
|
|
|
+ for(; hasnext && dst_cmp_lsr < 0; ) {
|
|
|
+ hasnext = dstin.next(dstpath, dstfrom);
|
|
|
+ dst_cmp_lsr = dstpath.compareTo(lsrpath);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (dst_cmp_lsr == 0) {
|
|
|
+ //lsrpath exists in dst, skip it
|
|
|
+ hasnext = dstin.next(dstpath, dstfrom);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ //lsrpath does not exist, delete it
|
|
|
+ String s = new Path(dstroot.getPath(), lsrpath.toString()).toString();
|
|
|
+ if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
|
|
|
+ shellargs[1] = s;
|
|
|
+ int r = 0;
|
|
|
+ try {
|
|
|
+ r = shell.run(shellargs);
|
|
|
+ } catch(Exception e) {
|
|
|
+ throw new IOException("Exception from shell.", e);
|
|
|
+ }
|
|
|
+ if (r != 0) {
|
|
|
+ throw new IOException("\"" + shellargs[0] + " " + shellargs[1]
|
|
|
+ + "\" returns non-zero value " + r);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ checkAndClose(lsrin);
|
|
|
+ checkAndClose(dstin);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //is x an ancestor path of y?
|
|
|
+ static private boolean isAncestorPath(String x, String y) {
|
|
|
+ if (!y.startsWith(x)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ final int len = x.length();
|
|
|
+ return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR;
|
|
|
+ }
|
|
|
+
|
|
|
/** Check whether the file list have duplication. */
|
|
|
static private void checkDuplication(FileSystem fs, Path file, Path sorted,
|
|
|
Configuration conf) throws IOException {
|