|
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FsShell;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.Trash;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.HftpFileSystem;
|
|
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
|
@@ -49,6 +50,7 @@ import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
+import org.apache.hadoop.io.NullWritable;
|
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
@@ -1289,7 +1291,7 @@ public class DistCp implements Tool {
|
|
|
final FileStatus jobDirStat = jobfs.getFileStatus(jobdir);
|
|
|
final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
|
|
|
final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
|
|
|
- dstlsr, Text.class, dstroot.getClass(),
|
|
|
+ dstlsr, Text.class, NullWritable.class,
|
|
|
SequenceFile.CompressionType.NONE);
|
|
|
try {
|
|
|
// do lsr to get all file statuses in dstroot
|
|
@@ -1302,7 +1304,7 @@ public class DistCp implements Tool {
|
|
|
}
|
|
|
for(FileStatus child : dstfs.listStatus(status.getPath())) {
|
|
|
String relative = makeRelative(dstroot.getPath(), child.getPath());
|
|
|
- writer.append(new Text(relative), child);
|
|
|
+ writer.append(new Text(relative), NullWritable.get());
|
|
|
lsrstack.push(child);
|
|
|
}
|
|
|
}
|
|
@@ -1314,7 +1316,7 @@ public class DistCp implements Tool {
|
|
|
// sort lsr results
|
|
|
final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
|
|
|
SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
|
|
|
- new Text.Comparator(), Text.class, FileStatus.class, jobconf);
|
|
|
+ new Text.Comparator(), Text.class, NullWritable.class, jobconf);
|
|
|
sorter.sort(dstlsr, sortedlsr);
|
|
|
|
|
|
// compare lsr list and dst list
|
|
@@ -1327,14 +1329,13 @@ public class DistCp implements Tool {
|
|
|
|
|
|
// compare sorted lsr list and sorted dst list
|
|
|
final Text lsrpath = new Text();
|
|
|
- final FileStatus lsrstatus = new FileStatus();
|
|
|
final Text dstpath = new Text();
|
|
|
final Text dstfrom = new Text();
|
|
|
- final FsShell shell = new FsShell(conf);
|
|
|
- final String[] shellargs = {"-rmr", null};
|
|
|
+ final Trash trash = new Trash(dstfs, conf);
|
|
|
+ Path lastpath = null;
|
|
|
|
|
|
boolean hasnext = dstin.next(dstpath, dstfrom);
|
|
|
- for(; lsrin.next(lsrpath, lsrstatus); ) {
|
|
|
+ while (lsrin.next(lsrpath, NullWritable.get())) {
|
|
|
//
|
|
|
// check if lsrpath is in dst (represented here by dstsorted, which
|
|
|
// contains files and dirs to be copied from the source to destination),
|
|
@@ -1342,7 +1343,7 @@ public class DistCp implements Tool {
|
|
|
// ancestor.
|
|
|
//
|
|
|
int dst_cmp_lsr = dstpath.compareTo(lsrpath);
|
|
|
- for(; hasnext && dst_cmp_lsr < 0; ) {
|
|
|
+ while (hasnext && dst_cmp_lsr < 0) {
|
|
|
hasnext = dstin.next(dstpath, dstfrom);
|
|
|
dst_cmp_lsr = dstpath.compareTo(lsrpath);
|
|
|
}
|
|
@@ -1354,29 +1355,22 @@ public class DistCp implements Tool {
|
|
|
else {
|
|
|
// lsrpath does not exist in dst, delete it if it's not jobDir or
|
|
|
// jobDir's ancestor
|
|
|
- String s = new Path(dstroot.getPath(), lsrpath.toString()).toString();
|
|
|
+ final Path rmpath = new Path(dstroot.getPath(), lsrpath.toString());
|
|
|
if (needToFilterJobDir) {
|
|
|
- int cmpJobDir = s.compareTo(jobDirStr);
|
|
|
+ int cmpJobDir = rmpath.toString().compareTo(jobDirStr);
|
|
|
if (cmpJobDir > 0) {
|
|
|
// do nothing
|
|
|
- } else if (cmpJobDir == 0 || isAncestorPath(s, jobDirStr)) {
|
|
|
+ } else if (cmpJobDir == 0 || isAncestorPath(rmpath, jobdir)) {
|
|
|
continue;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
|
|
|
- shellargs[1] = s;
|
|
|
- int r = 0;
|
|
|
- try {
|
|
|
- r = shell.run(shellargs);
|
|
|
- } catch(Exception e) {
|
|
|
- throw new IOException("Exception from shell.", e);
|
|
|
+
|
|
|
+ if ((lastpath == null || !isAncestorPath(lastpath, rmpath))) {
|
|
|
+ if (!(trash.moveToTrash(rmpath) || dstfs.delete(rmpath, true))) {
|
|
|
+ throw new IOException("Failed to delete " + rmpath);
|
|
|
}
|
|
|
- if (r != 0) {
|
|
|
- throw new IOException("\"" + shellargs[0] + " " + shellargs[1]
|
|
|
- + "\" returns non-zero value " + r);
|
|
|
- }
|
|
|
- }
|
|
|
+ lastpath = rmpath;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -1385,8 +1379,10 @@ public class DistCp implements Tool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- //is x an ancestor path of y?
|
|
|
- static private boolean isAncestorPath(String x, String y) {
|
|
|
+ // is xp an ancestor path of yp?
|
|
|
+ static private boolean isAncestorPath(Path xp, Path yp) {
|
|
|
+ final String x = xp.toString();
|
|
|
+ final String y = yp.toString();
|
|
|
if (!y.startsWith(x)) {
|
|
|
return false;
|
|
|
}
|