|
@@ -20,11 +20,9 @@ package org.apache.hadoop.util;
|
|
|
|
|
|
import java.io.BufferedReader;
|
|
import java.io.BufferedReader;
|
|
import java.io.DataInput;
|
|
import java.io.DataInput;
|
|
-import java.io.DataInputStream;
|
|
|
|
import java.io.DataOutput;
|
|
import java.io.DataOutput;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStreamReader;
|
|
import java.io.InputStreamReader;
|
|
-import java.text.DecimalFormat;
|
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
@@ -36,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.io.LongWritable;
|
|
import org.apache.hadoop.io.LongWritable;
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -65,7 +64,13 @@ public class CopyFiles implements Tool {
|
|
private static final String usage = NAME
|
|
private static final String usage = NAME
|
|
+ " [OPTIONS] <srcurl>* <desturl>" +
|
|
+ " [OPTIONS] <srcurl>* <desturl>" +
|
|
"\n\nOPTIONS:" +
|
|
"\n\nOPTIONS:" +
|
|
- "\n-p Preserve status" +
|
|
|
|
|
|
+ "\n-p[rbugp] Preserve status" +
|
|
|
|
+ "\n r: replication number" +
|
|
|
|
+ "\n b: block size" +
|
|
|
|
+ "\n u: user" +
|
|
|
|
+ "\n g: group" +
|
|
|
|
+ "\n p: permission" +
|
|
|
|
+ "\n -p alone is equivalent to -prbugp" +
|
|
"\n-i Ignore failures" +
|
|
"\n-i Ignore failures" +
|
|
"\n-log <logdir> Write logs to <logdir>" +
|
|
"\n-log <logdir> Write logs to <logdir>" +
|
|
"\n-overwrite Overwrite destination" +
|
|
"\n-overwrite Overwrite destination" +
|
|
@@ -86,7 +91,7 @@ public class CopyFiles implements Tool {
|
|
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
|
|
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
|
|
static enum Options {
|
|
static enum Options {
|
|
IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
|
|
IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
|
|
- PRESERVE_STATUS("-p", NAME + ".preserve.status.info"),
|
|
|
|
|
|
+ PRESERVE_STATUS("-p", NAME + ".preserve.status"),
|
|
OVERWRITE("-overwrite", NAME + ".overwrite.always"),
|
|
OVERWRITE("-overwrite", NAME + ".overwrite.always"),
|
|
UPDATE("-update", NAME + ".overwrite.ifnewer");
|
|
UPDATE("-update", NAME + ".overwrite.ifnewer");
|
|
|
|
|
|
@@ -97,6 +102,38 @@ public class CopyFiles implements Tool {
|
|
this.propertyname = propertyname;
|
|
this.propertyname = propertyname;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ static enum FileAttribute {
|
|
|
|
+ BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
|
|
|
|
+
|
|
|
|
+ final char symbol;
|
|
|
|
+
|
|
|
|
+ private FileAttribute() {symbol = toString().toLowerCase().charAt(0);}
|
|
|
|
+
|
|
|
|
+ static EnumSet<FileAttribute> parse(String s) {
|
|
|
|
+ if (s == null || s.length() == 0) {
|
|
|
|
+ return EnumSet.allOf(FileAttribute.class);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class);
|
|
|
|
+ FileAttribute[] attributes = values();
|
|
|
|
+ for(char c : s.toCharArray()) {
|
|
|
|
+ int i = 0;
|
|
|
|
+ for(; i < attributes.length && c != attributes[i].symbol; i++);
|
|
|
|
+ if (i < attributes.length) {
|
|
|
|
+ if (!set.contains(attributes[i])) {
|
|
|
|
+ set.add(attributes[i]);
|
|
|
|
+ } else {
|
|
|
|
+ throw new IllegalArgumentException("There are more than one '"
|
|
|
|
+ + attributes[i].symbol + "' in " + s);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ throw new IllegalArgumentException("'" + c + "' in " + s
|
|
|
|
+ + " is undefined.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return set;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
|
|
static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
|
|
static final String DST_DIR_LABEL = NAME + ".dest.path";
|
|
static final String DST_DIR_LABEL = NAME + ".dest.path";
|
|
@@ -104,6 +141,9 @@ public class CopyFiles implements Tool {
|
|
static final String SRC_LIST_LABEL = NAME + ".src.list";
|
|
static final String SRC_LIST_LABEL = NAME + ".src.list";
|
|
static final String SRC_COUNT_LABEL = NAME + ".src.count";
|
|
static final String SRC_COUNT_LABEL = NAME + ".src.count";
|
|
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
|
|
static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
|
|
|
|
+ static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
|
|
|
|
+ static final String PRESERVE_STATUS_LABEL
|
|
|
|
+ = Options.PRESERVE_STATUS.propertyname + ".value";
|
|
|
|
|
|
private JobConf conf;
|
|
private JobConf conf;
|
|
|
|
|
|
@@ -128,22 +168,22 @@ public class CopyFiles implements Tool {
|
|
*/
|
|
*/
|
|
static class FilePair implements Writable {
|
|
static class FilePair implements Writable {
|
|
FileStatus input = new FileStatus();
|
|
FileStatus input = new FileStatus();
|
|
- Path output;
|
|
|
|
|
|
+ String output;
|
|
FilePair() { }
|
|
FilePair() { }
|
|
- FilePair(FileStatus input, Path output) {
|
|
|
|
|
|
+ FilePair(FileStatus input, String output) {
|
|
this.input = input;
|
|
this.input = input;
|
|
this.output = output;
|
|
this.output = output;
|
|
}
|
|
}
|
|
public void readFields(DataInput in) throws IOException {
|
|
public void readFields(DataInput in) throws IOException {
|
|
input.readFields(in);
|
|
input.readFields(in);
|
|
- output = new Path(Text.readString(in));
|
|
|
|
|
|
+ output = Text.readString(in);
|
|
}
|
|
}
|
|
public void write(DataOutput out) throws IOException {
|
|
public void write(DataOutput out) throws IOException {
|
|
input.write(out);
|
|
input.write(out);
|
|
- Text.writeString(out, output.toString());
|
|
|
|
|
|
+ Text.writeString(out, output);
|
|
}
|
|
}
|
|
public String toString() {
|
|
public String toString() {
|
|
- return input.toString() + " : " + output.toString();
|
|
|
|
|
|
+ return input + " : " + output;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -224,13 +264,14 @@ public class CopyFiles implements Tool {
|
|
/**
|
|
/**
|
|
* FSCopyFilesMapper: The mapper for copying files between FileSystems.
|
|
* FSCopyFilesMapper: The mapper for copying files between FileSystems.
|
|
*/
|
|
*/
|
|
- public static class FSCopyFilesMapper
|
|
|
|
|
|
+ static class CopyFilesMapper
|
|
implements Mapper<LongWritable, FilePair, WritableComparable, Text> {
|
|
implements Mapper<LongWritable, FilePair, WritableComparable, Text> {
|
|
// config
|
|
// config
|
|
private int sizeBuf = 128 * 1024;
|
|
private int sizeBuf = 128 * 1024;
|
|
private FileSystem destFileSys = null;
|
|
private FileSystem destFileSys = null;
|
|
private boolean ignoreReadFailures;
|
|
private boolean ignoreReadFailures;
|
|
private boolean preserve_status;
|
|
private boolean preserve_status;
|
|
|
|
+ private EnumSet<FileAttribute> preseved;
|
|
private boolean overwrite;
|
|
private boolean overwrite;
|
|
private boolean update;
|
|
private boolean update;
|
|
private Path destPath = null;
|
|
private Path destPath = null;
|
|
@@ -238,14 +279,16 @@ public class CopyFiles implements Tool {
|
|
private JobConf job;
|
|
private JobConf job;
|
|
|
|
|
|
// stats
|
|
// stats
|
|
- private static final DecimalFormat pcntfmt = new DecimalFormat("0.00");
|
|
|
|
private int failcount = 0;
|
|
private int failcount = 0;
|
|
private int skipcount = 0;
|
|
private int skipcount = 0;
|
|
private int copycount = 0;
|
|
private int copycount = 0;
|
|
|
|
|
|
|
|
+ private String getCountString() {
|
|
|
|
+ return "Copied: " + copycount + " Skipped: " + skipcount
|
|
|
|
+ + " Failed: " + failcount;
|
|
|
|
+ }
|
|
private void updateStatus(Reporter reporter) {
|
|
private void updateStatus(Reporter reporter) {
|
|
- reporter.setStatus("Copied: " + copycount + " Skipped: " + skipcount +
|
|
|
|
- " Failed: " + failcount);
|
|
|
|
|
|
+ reporter.setStatus(getCountString());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -257,6 +300,22 @@ public class CopyFiles implements Tool {
|
|
private boolean needsUpdate(FileStatus src, FileStatus dst) {
|
|
private boolean needsUpdate(FileStatus src, FileStatus dst) {
|
|
return update && src.getLen() != dst.getLen();
|
|
return update && src.getLen() != dst.getLen();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private FSDataOutputStream create(Path f, Reporter reporter,
|
|
|
|
+ FileStatus srcstat) throws IOException {
|
|
|
|
+ if (!preserve_status) {
|
|
|
|
+ return destFileSys.create(f, reporter);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
|
|
|
|
+ srcstat.getPermission(): null;
|
|
|
|
+ short replication = preseved.contains(FileAttribute.REPLICATION)?
|
|
|
|
+ srcstat.getReplication(): destFileSys.getDefaultReplication();
|
|
|
|
+ long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
|
|
|
|
+ srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
|
|
|
|
+ return destFileSys.create(f, permission, true, sizeBuf, replication,
|
|
|
|
+ blockSize, reporter);
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Copy a file to a destination.
|
|
* Copy a file to a destination.
|
|
@@ -306,17 +365,14 @@ public class CopyFiles implements Tool {
|
|
in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
|
|
in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
|
|
reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
|
|
reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
|
|
// open tmp file
|
|
// open tmp file
|
|
- out = preserve_status
|
|
|
|
- ? destFileSys.create(tmpfile, true, sizeBuf, srcstat.getReplication(),
|
|
|
|
- srcstat.getBlockSize(), reporter)
|
|
|
|
- : destFileSys.create(tmpfile, reporter);
|
|
|
|
|
|
+ out = create(tmpfile, reporter, srcstat);
|
|
// copy file
|
|
// copy file
|
|
- int cbread;
|
|
|
|
- while ((cbread = in.read(buffer)) >= 0) {
|
|
|
|
|
|
+ for(int cbread; (cbread = in.read(buffer)) >= 0; ) {
|
|
out.write(buffer, 0, cbread);
|
|
out.write(buffer, 0, cbread);
|
|
cbcopied += cbread;
|
|
cbcopied += cbread;
|
|
- reporter.setStatus(pcntfmt.format(100.0 * cbcopied / srcstat.getLen())
|
|
|
|
- + " " + absdst + " [ " +
|
|
|
|
|
|
+ reporter.setStatus(
|
|
|
|
+ String.format("%.2f ", cbcopied*100.0/srcstat.getLen())
|
|
|
|
+ + absdst + " [ " +
|
|
StringUtils.humanReadableInt(cbcopied) + " / " +
|
|
StringUtils.humanReadableInt(cbcopied) + " / " +
|
|
StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
|
|
StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
|
|
}
|
|
}
|
|
@@ -350,7 +406,8 @@ public class CopyFiles implements Tool {
|
|
if (!destFileSys.mkdirs(absdst.getParent())) {
|
|
if (!destFileSys.mkdirs(absdst.getParent())) {
|
|
throw new IOException("Failed to craete parent dir: " + absdst.getParent());
|
|
throw new IOException("Failed to craete parent dir: " + absdst.getParent());
|
|
}
|
|
}
|
|
- rename(destFileSys, tmpfile, absdst);
|
|
|
|
|
|
+ rename(tmpfile, absdst);
|
|
|
|
+ updatePermissions(srcstat, destFileSys.getFileStatus(absdst));
|
|
}
|
|
}
|
|
|
|
|
|
// report at least once for each file
|
|
// report at least once for each file
|
|
@@ -361,23 +418,28 @@ public class CopyFiles implements Tool {
|
|
}
|
|
}
|
|
|
|
|
|
/** rename tmp to dst, delete dst if already exists */
|
|
/** rename tmp to dst, delete dst if already exists */
|
|
- private void rename(FileSystem fs, Path tmp, Path dst) throws IOException {
|
|
|
|
|
|
+ private void rename(Path tmp, Path dst) throws IOException {
|
|
try {
|
|
try {
|
|
- if (fs.exists(dst)) {
|
|
|
|
- fs.delete(dst, true);
|
|
|
|
|
|
+ if (destFileSys.exists(dst)) {
|
|
|
|
+ destFileSys.delete(dst, true);
|
|
}
|
|
}
|
|
- if (!fs.rename(tmp, dst)) {
|
|
|
|
|
|
+ if (!destFileSys.rename(tmp, dst)) {
|
|
throw new IOException();
|
|
throw new IOException();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch(IOException cause) {
|
|
catch(IOException cause) {
|
|
- IOException ioe = new IOException("Fail to rename tmp file (=" + tmp
|
|
|
|
- + ") to destination file (=" + dst + ")");
|
|
|
|
- ioe.initCause(cause);
|
|
|
|
- throw ioe;
|
|
|
|
|
|
+ throw (IOException)new IOException("Fail to rename tmp file (=" + tmp
|
|
|
|
+ + ") to destination file (=" + dst + ")").initCause(cause);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private void updatePermissions(FileStatus src, FileStatus dst
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ if (preserve_status) {
|
|
|
|
+ CopyFiles.updatePermissions(src, dst, preseved, destFileSys);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
static String bytesString(long b) {
|
|
static String bytesString(long b) {
|
|
return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
|
|
return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
|
|
}
|
|
}
|
|
@@ -399,6 +461,9 @@ public class CopyFiles implements Tool {
|
|
buffer = new byte[sizeBuf];
|
|
buffer = new byte[sizeBuf];
|
|
ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
|
|
ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
|
|
preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
|
|
preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
|
|
|
|
+ if (preserve_status) {
|
|
|
|
+ preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL));
|
|
|
|
+ }
|
|
update = job.getBoolean(Options.UPDATE.propertyname, false);
|
|
update = job.getBoolean(Options.UPDATE.propertyname, false);
|
|
overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
|
|
overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
|
|
this.job = job;
|
|
this.job = job;
|
|
@@ -415,7 +480,7 @@ public class CopyFiles implements Tool {
|
|
OutputCollector<WritableComparable, Text> out,
|
|
OutputCollector<WritableComparable, Text> out,
|
|
Reporter reporter) throws IOException {
|
|
Reporter reporter) throws IOException {
|
|
FileStatus srcstat = value.input;
|
|
FileStatus srcstat = value.input;
|
|
- Path dstpath = value.output;
|
|
|
|
|
|
+ Path dstpath = new Path(value.output);
|
|
try {
|
|
try {
|
|
copy(srcstat, dstpath, out, reporter);
|
|
copy(srcstat, dstpath, out, reporter);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -451,10 +516,8 @@ public class CopyFiles implements Tool {
|
|
if (0 == failcount || ignoreReadFailures) {
|
|
if (0 == failcount || ignoreReadFailures) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- throw new IOException("Copied: " + copycount + " Skipped: " + skipcount +
|
|
|
|
- " Failed: " + failcount);
|
|
|
|
|
|
+ throw new IOException(getCountString());
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private static List<Path> fetchFileList(Configuration conf, Path srcList)
|
|
private static List<Path> fetchFileList(Configuration conf, Path srcList)
|
|
@@ -490,7 +553,7 @@ public class CopyFiles implements Tool {
|
|
EnumSet<Options> flags = ignoreReadFailures
|
|
EnumSet<Options> flags = ignoreReadFailures
|
|
? EnumSet.of(Options.IGNORE_READ_FAILURES)
|
|
? EnumSet.of(Options.IGNORE_READ_FAILURES)
|
|
: EnumSet.noneOf(Options.class);
|
|
: EnumSet.noneOf(Options.class);
|
|
- copy(conf, tmp, new Path(destPath), logPath, flags);
|
|
|
|
|
|
+ copy(conf, tmp, new Path(destPath), logPath, flags, null);
|
|
}
|
|
}
|
|
|
|
|
|
/** Sanity check for srcPath */
|
|
/** Sanity check for srcPath */
|
|
@@ -515,18 +578,23 @@ public class CopyFiles implements Tool {
|
|
* @param logPath Log output directory
|
|
* @param logPath Log output directory
|
|
* @param flags Command-line flags
|
|
* @param flags Command-line flags
|
|
*/
|
|
*/
|
|
- public static void copy(Configuration conf, List<Path> srcPaths,
|
|
|
|
- Path destPath, Path logPath,
|
|
|
|
- EnumSet<Options> flags) throws IOException {
|
|
|
|
|
|
+ static void copy(Configuration conf, List<Path> srcPaths,
|
|
|
|
+ Path destPath, Path logPath, EnumSet<Options> flags,
|
|
|
|
+ String presevedAttributes) throws IOException {
|
|
LOG.info("srcPaths=" + srcPaths);
|
|
LOG.info("srcPaths=" + srcPaths);
|
|
LOG.info("destPath=" + destPath);
|
|
LOG.info("destPath=" + destPath);
|
|
checkSrcPath(conf, srcPaths);
|
|
checkSrcPath(conf, srcPaths);
|
|
|
|
|
|
JobConf job = createJobConf(conf);
|
|
JobConf job = createJobConf(conf);
|
|
|
|
+ if (presevedAttributes != null) {
|
|
|
|
+ job.set(PRESERVE_STATUS_LABEL, presevedAttributes);
|
|
|
|
+ }
|
|
|
|
+
|
|
//Initialize the mapper
|
|
//Initialize the mapper
|
|
try {
|
|
try {
|
|
setup(conf, job, srcPaths, destPath, logPath, flags);
|
|
setup(conf, job, srcPaths, destPath, logPath, flags);
|
|
JobClient.runJob(job);
|
|
JobClient.runJob(job);
|
|
|
|
+ finalize(conf, job, destPath, presevedAttributes);
|
|
} finally {
|
|
} finally {
|
|
//delete tmp
|
|
//delete tmp
|
|
fullyDelete(job.get(TMP_DIR_LABEL), job);
|
|
fullyDelete(job.get(TMP_DIR_LABEL), job);
|
|
@@ -535,6 +603,124 @@ public class CopyFiles implements Tool {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static void updatePermissions(FileStatus src, FileStatus dst,
|
|
|
|
+ EnumSet<FileAttribute> preseved, FileSystem destFileSys
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ String owner = null;
|
|
|
|
+ String group = null;
|
|
|
|
+ if (preseved.contains(FileAttribute.USER)
|
|
|
|
+ && !src.getOwner().equals(dst.getOwner())) {
|
|
|
|
+ owner = src.getOwner();
|
|
|
|
+ }
|
|
|
|
+ if (preseved.contains(FileAttribute.GROUP)
|
|
|
|
+ && !src.getGroup().equals(dst.getGroup())) {
|
|
|
|
+ group = src.getGroup();
|
|
|
|
+ }
|
|
|
|
+ if (owner != null || group != null) {
|
|
|
|
+ destFileSys.setOwner(dst.getPath(), owner, group);
|
|
|
|
+ }
|
|
|
|
+ if (preseved.contains(FileAttribute.PERMISSION)
|
|
|
|
+ && !src.getPermission().equals(dst.getPermission())) {
|
|
|
|
+ destFileSys.setPermission(dst.getPath(), src.getPermission());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static private void finalize(Configuration conf, JobConf jobconf,
|
|
|
|
+ final Path destPath, String presevedAttributes) throws IOException {
|
|
|
|
+ if (presevedAttributes == null) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
|
|
|
|
+ if (!preseved.contains(FileAttribute.USER)
|
|
|
|
+ && !preseved.contains(FileAttribute.GROUP)
|
|
|
|
+ && !preseved.contains(FileAttribute.PERMISSION)) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FileSystem dstfs = destPath.getFileSystem(conf);
|
|
|
|
+ Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
|
|
|
|
+ SequenceFile.Reader in = null;
|
|
|
|
+ try {
|
|
|
|
+ in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf),
|
|
|
|
+ dstdirlist, jobconf);
|
|
|
|
+ Text dsttext = new Text();
|
|
|
|
+ FilePair pair = new FilePair();
|
|
|
|
+ for(; in.next(dsttext, pair); ) {
|
|
|
|
+ Path absdst = new Path(destPath, pair.output);
|
|
|
|
+ updatePermissions(pair.input, dstfs.getFileStatus(absdst),
|
|
|
|
+ preseved, dstfs);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ checkAndClose(in);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static private class CommandArgument {
|
|
|
|
+ final List<Path> srcs;
|
|
|
|
+ final Path dst;
|
|
|
|
+ final Path log;
|
|
|
|
+ final EnumSet<Options> flags;
|
|
|
|
+ final String presevedAttributes;
|
|
|
|
+
|
|
|
|
+ CommandArgument(List<Path> srcs, Path dst, Path log,
|
|
|
|
+ EnumSet<Options> flags, String presevedAttributes) {
|
|
|
|
+ this.srcs = srcs;
|
|
|
|
+ this.dst = dst;
|
|
|
|
+ this.log = log;
|
|
|
|
+ this.flags = flags;
|
|
|
|
+ this.presevedAttributes = presevedAttributes;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static CommandArgument valueOf(String[] args, Configuration conf
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ List<Path> srcs = new ArrayList<Path>();
|
|
|
|
+ Path dst = null;
|
|
|
|
+ Path log = null;
|
|
|
|
+ EnumSet<Options> flags = EnumSet.noneOf(Options.class);
|
|
|
|
+ String presevedAttributes = null;
|
|
|
|
+
|
|
|
|
+ for (int idx = 0; idx < args.length; idx++) {
|
|
|
|
+ Options[] opt = Options.values();
|
|
|
|
+ int i = 0;
|
|
|
|
+ for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++);
|
|
|
|
+
|
|
|
|
+ if (i < opt.length) {
|
|
|
|
+ flags.add(opt[i]);
|
|
|
|
+ if (opt[i] == Options.PRESERVE_STATUS) {
|
|
|
|
+ presevedAttributes = args[idx].substring(2);
|
|
|
|
+ FileAttribute.parse(presevedAttributes); //validation
|
|
|
|
+ }
|
|
|
|
+ } else if ("-f".equals(args[idx])) {
|
|
|
|
+ if (++idx == args.length) {
|
|
|
|
+ throw new IllegalArgumentException("urilist_uri not specified in -f");
|
|
|
|
+ }
|
|
|
|
+ srcs.addAll(fetchFileList(conf, new Path(args[idx])));
|
|
|
|
+ } else if ("-log".equals(args[idx])) {
|
|
|
|
+ if (++idx == args.length) {
|
|
|
|
+ throw new IllegalArgumentException("logdir not specified in -log");
|
|
|
|
+ }
|
|
|
|
+ log = new Path(args[idx]);
|
|
|
|
+ } else if ('-' == args[idx].codePointAt(0)) {
|
|
|
|
+ throw new IllegalArgumentException("Invalid switch " + args[idx]);
|
|
|
|
+ } else if (idx == args.length -1) {
|
|
|
|
+ dst = new Path(args[idx]);
|
|
|
|
+ } else {
|
|
|
|
+ srcs.add(new Path(args[idx]));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // mandatory command-line parameters
|
|
|
|
+ if (srcs.isEmpty() || dst == null) {
|
|
|
|
+ throw new IllegalArgumentException("Missing "
|
|
|
|
+ + (dst == null ? "dst path" : "src"));
|
|
|
|
+ }
|
|
|
|
+ // incompatible command-line flags
|
|
|
|
+ if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
|
|
|
|
+ throw new IllegalArgumentException("Conflicting overwrite policies");
|
|
|
|
+ }
|
|
|
|
+ return new CommandArgument(srcs, dst, log, flags, presevedAttributes);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This is the main driver for recursively copying directories
|
|
* This is the main driver for recursively copying directories
|
|
* across file systems. It takes at least two cmdline parameters. A source
|
|
* across file systems. It takes at least two cmdline parameters. A source
|
|
@@ -544,59 +730,14 @@ public class CopyFiles implements Tool {
|
|
* reduce is empty.
|
|
* reduce is empty.
|
|
*/
|
|
*/
|
|
public int run(String[] args) throws Exception {
|
|
public int run(String[] args) throws Exception {
|
|
- List<Path> srcPath = new ArrayList<Path>();
|
|
|
|
- Path destPath = null;
|
|
|
|
- Path logPath = null;
|
|
|
|
- EnumSet<Options> flags = EnumSet.noneOf(Options.class);
|
|
|
|
-
|
|
|
|
- for (int idx = 0; idx < args.length; idx++) {
|
|
|
|
- Options[] opt = Options.values();
|
|
|
|
- int i = 0;
|
|
|
|
- for(; i < opt.length && !opt[i].cmd.equals(args[idx]); i++);
|
|
|
|
-
|
|
|
|
- if (i < opt.length) {
|
|
|
|
- flags.add(opt[i]);
|
|
|
|
- }
|
|
|
|
- else if ("-f".equals(args[idx])) {
|
|
|
|
- if (++idx == args.length) {
|
|
|
|
- System.out.println("urilist_uri not specified");
|
|
|
|
- System.out.println(usage);
|
|
|
|
- return -1;
|
|
|
|
- }
|
|
|
|
- srcPath.addAll(fetchFileList(conf, new Path(args[idx])));
|
|
|
|
- } else if ("-log".equals(args[idx])) {
|
|
|
|
- if (++idx == args.length) {
|
|
|
|
- System.out.println("logdir not specified");
|
|
|
|
- System.out.println(usage);
|
|
|
|
- return -1;
|
|
|
|
- }
|
|
|
|
- logPath = new Path(args[idx]);
|
|
|
|
- } else if ('-' == args[idx].codePointAt(0)) {
|
|
|
|
- System.out.println("Invalid switch " + args[idx]);
|
|
|
|
- System.out.println(usage);
|
|
|
|
- ToolRunner.printGenericCommandUsage(System.out);
|
|
|
|
- return -1;
|
|
|
|
- } else if (idx == args.length -1) {
|
|
|
|
- destPath = new Path(args[idx]);
|
|
|
|
- } else {
|
|
|
|
- srcPath.add(new Path(args[idx]));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- // mandatory command-line parameters
|
|
|
|
- if (srcPath.isEmpty() || destPath == null) {
|
|
|
|
- System.out.println("Missing " + (destPath == null ? "dst path" : "src"));
|
|
|
|
- System.out.println(usage);
|
|
|
|
- ToolRunner.printGenericCommandUsage(System.out);
|
|
|
|
- return -1;
|
|
|
|
- }
|
|
|
|
- // incompatible command-line flags
|
|
|
|
- if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
|
|
|
|
- System.out.println("Conflicting overwrite policies");
|
|
|
|
- System.out.println(usage);
|
|
|
|
- return -1;
|
|
|
|
- }
|
|
|
|
try {
|
|
try {
|
|
- copy(conf, srcPath, destPath, logPath, flags);
|
|
|
|
|
|
+ CommandArgument p = CommandArgument.valueOf(args, conf);
|
|
|
|
+ copy(conf, p.srcs, p.dst, p.log, p.flags, p.presevedAttributes);
|
|
|
|
+ return 0;
|
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
|
+ System.err.println(StringUtils.stringifyException(e) + "\n" + usage);
|
|
|
|
+ ToolRunner.printGenericCommandUsage(System.err);
|
|
|
|
+ return -1;
|
|
} catch (DuplicationException e) {
|
|
} catch (DuplicationException e) {
|
|
System.err.println(StringUtils.stringifyException(e));
|
|
System.err.println(StringUtils.stringifyException(e));
|
|
return DuplicationException.ERROR_CODE;
|
|
return DuplicationException.ERROR_CODE;
|
|
@@ -604,9 +745,8 @@ public class CopyFiles implements Tool {
|
|
System.err.println("With failures, global counters are inaccurate; " +
|
|
System.err.println("With failures, global counters are inaccurate; " +
|
|
"consider running with -i");
|
|
"consider running with -i");
|
|
System.err.println("Copy failed: " + StringUtils.stringifyException(e));
|
|
System.err.println("Copy failed: " + StringUtils.stringifyException(e));
|
|
- return -1;
|
|
|
|
|
|
+ return -999;
|
|
}
|
|
}
|
|
- return 0;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
public static void main(String[] args) throws Exception {
|
|
@@ -621,28 +761,26 @@ public class CopyFiles implements Tool {
|
|
* absPath is always assumed to descend from root.
|
|
* absPath is always assumed to descend from root.
|
|
* Otherwise returned path is null.
|
|
* Otherwise returned path is null.
|
|
*/
|
|
*/
|
|
- public static Path makeRelative(Path root, Path absPath) {
|
|
|
|
- if (!absPath.isAbsolute()) { return absPath; }
|
|
|
|
- String sRoot = root.toUri().getPath();
|
|
|
|
- String sPath = absPath.toUri().getPath();
|
|
|
|
- Enumeration<Object> rootTokens = new StringTokenizer(sRoot, "/");
|
|
|
|
- ArrayList rList = Collections.list(rootTokens);
|
|
|
|
- Enumeration<Object> pathTokens = new StringTokenizer(sPath, "/");
|
|
|
|
- ArrayList pList = Collections.list(pathTokens);
|
|
|
|
- Iterator rIter = rList.iterator();
|
|
|
|
- Iterator pIter = pList.iterator();
|
|
|
|
- while (rIter.hasNext()) {
|
|
|
|
- String rElem = (String) rIter.next();
|
|
|
|
- String pElem = (String) pIter.next();
|
|
|
|
- if (!rElem.equals(pElem)) { return null; }
|
|
|
|
- }
|
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
|
- while (pIter.hasNext()) {
|
|
|
|
- String pElem = (String) pIter.next();
|
|
|
|
- sb.append(pElem);
|
|
|
|
- if (pIter.hasNext()) { sb.append("/"); }
|
|
|
|
- }
|
|
|
|
- return new Path(sb.toString());
|
|
|
|
|
|
+ static String makeRelative(Path root, Path absPath) {
|
|
|
|
+ if (!absPath.isAbsolute()) {
|
|
|
|
+ throw new IllegalArgumentException("!absPath.isAbsolute(), absPath="
|
|
|
|
+ + absPath);
|
|
|
|
+ }
|
|
|
|
+ String p = absPath.toUri().getPath();
|
|
|
|
+
|
|
|
|
+ StringTokenizer pathTokens = new StringTokenizer(p, "/");
|
|
|
|
+ for(StringTokenizer rootTokens = new StringTokenizer(
|
|
|
|
+ root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) {
|
|
|
|
+ if (!rootTokens.nextToken().equals(pathTokens.nextToken())) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
|
+ for(; pathTokens.hasMoreTokens(); ) {
|
|
|
|
+ sb.append(pathTokens.nextToken());
|
|
|
|
+ if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); }
|
|
|
|
+ }
|
|
|
|
+ return sb.length() == 0? ".": sb.toString();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -681,7 +819,7 @@ public class CopyFiles implements Tool {
|
|
jobconf.setOutputKeyClass(Text.class);
|
|
jobconf.setOutputKeyClass(Text.class);
|
|
jobconf.setOutputValueClass(Text.class);
|
|
jobconf.setOutputValueClass(Text.class);
|
|
|
|
|
|
- jobconf.setMapperClass(FSCopyFilesMapper.class);
|
|
|
|
|
|
+ jobconf.setMapperClass(CopyFilesMapper.class);
|
|
jobconf.setNumReduceTasks(0);
|
|
jobconf.setNumReduceTasks(0);
|
|
return jobconf;
|
|
return jobconf;
|
|
}
|
|
}
|
|
@@ -735,7 +873,9 @@ public class CopyFiles implements Tool {
|
|
String filename = "_distcp_logs_" + randomId;
|
|
String filename = "_distcp_logs_" + randomId;
|
|
if (!dstExists || !dstIsDir) {
|
|
if (!dstExists || !dstIsDir) {
|
|
Path parent = destPath.getParent();
|
|
Path parent = destPath.getParent();
|
|
- dstfs.mkdirs(parent);
|
|
|
|
|
|
+ if (!dstfs.exists(parent)) {
|
|
|
|
+ dstfs.mkdirs(parent);
|
|
|
|
+ }
|
|
logPath = new Path(parent, filename);
|
|
logPath = new Path(parent, filename);
|
|
} else {
|
|
} else {
|
|
logPath = new Path(destPath, filename);
|
|
logPath = new Path(destPath, filename);
|
|
@@ -757,35 +897,41 @@ public class CopyFiles implements Tool {
|
|
dstfilelist, Text.class, Text.class,
|
|
dstfilelist, Text.class, Text.class,
|
|
SequenceFile.CompressionType.NONE);
|
|
SequenceFile.CompressionType.NONE);
|
|
|
|
|
|
|
|
+ Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
|
|
|
|
+ jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
|
|
|
|
+ SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf,
|
|
|
|
+ dstdirlist, Text.class, FilePair.class,
|
|
|
|
+ SequenceFile.CompressionType.NONE);
|
|
|
|
+
|
|
// handle the case where the destination directory doesn't exist
|
|
// handle the case where the destination directory doesn't exist
|
|
// and we've only a single src directory OR we're updating/overwriting
|
|
// and we've only a single src directory OR we're updating/overwriting
|
|
// the contents of the destination directory.
|
|
// the contents of the destination directory.
|
|
final boolean special =
|
|
final boolean special =
|
|
(srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
|
|
(srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
|
|
- int srcCount = 0, cnsyncf = 0;
|
|
|
|
|
|
+ int srcCount = 0, cnsyncf = 0, dirsyn = 0;
|
|
long cbsize = 0L, cbsyncs = 0L;
|
|
long cbsize = 0L, cbsyncs = 0L;
|
|
try {
|
|
try {
|
|
- for (Path p : srcPaths) {
|
|
|
|
- FileSystem fs = p.getFileSystem(conf);
|
|
|
|
- boolean pIsDir = fs.getFileStatus(p).isDir();
|
|
|
|
- Path root = special && pIsDir? p: p.getParent();
|
|
|
|
- if (pIsDir) {
|
|
|
|
|
|
+ for (Path src : srcPaths) {
|
|
|
|
+ FileSystem fs = src.getFileSystem(conf);
|
|
|
|
+ FileStatus srcfilestat = fs.getFileStatus(src);
|
|
|
|
+ Path root = special && srcfilestat.isDir()? src: src.getParent();
|
|
|
|
+ if (srcfilestat.isDir()) {
|
|
++srcCount;
|
|
++srcCount;
|
|
}
|
|
}
|
|
|
|
|
|
- Stack<Path> pathstack = new Stack<Path>();
|
|
|
|
- pathstack.push(p);
|
|
|
|
- while (!pathstack.empty()) {
|
|
|
|
- for (FileStatus stat : fs.listStatus(pathstack.pop())) {
|
|
|
|
|
|
+ Stack<FileStatus> pathstack = new Stack<FileStatus>();
|
|
|
|
+ for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
|
|
|
|
+ FileStatus cur = pathstack.pop();
|
|
|
|
+ for(FileStatus child : fs.listStatus(cur.getPath())) {
|
|
++srcCount;
|
|
++srcCount;
|
|
|
|
|
|
- if (stat.isDir()) {
|
|
|
|
- pathstack.push(stat.getPath());
|
|
|
|
|
|
+ if (child.isDir()) {
|
|
|
|
+ pathstack.push(child);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
++cnsyncf;
|
|
++cnsyncf;
|
|
- cbsyncs += stat.getLen();
|
|
|
|
- cbsize += stat.getLen();
|
|
|
|
|
|
+ cbsyncs += child.getLen();
|
|
|
|
+ cbsize += child.getLen();
|
|
|
|
|
|
if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
|
|
if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
|
|
src_writer.sync();
|
|
src_writer.sync();
|
|
@@ -795,17 +941,27 @@ public class CopyFiles implements Tool {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- Path dst = makeRelative(root, stat.getPath());
|
|
|
|
- src_writer.append(new LongWritable(stat.isDir()? 0: stat.getLen()),
|
|
|
|
- new FilePair(stat, dst));
|
|
|
|
- dst_writer.append(new Text(dst.toString()),
|
|
|
|
- new Text(stat.getPath().toString()));
|
|
|
|
|
|
+ String dst = makeRelative(root, child.getPath());
|
|
|
|
+ src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
|
|
|
|
+ new FilePair(child, dst));
|
|
|
|
+ dst_writer.append(new Text(dst),
|
|
|
|
+ new Text(child.getPath().toString()));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (cur.isDir()) {
|
|
|
|
+ String dst = makeRelative(root, cur.getPath());
|
|
|
|
+ dir_writer.append(new Text(dst), new FilePair(cur, dst));
|
|
|
|
+ if (++dirsyn > SYNC_FILE_MAX) {
|
|
|
|
+ dirsyn = 0;
|
|
|
|
+ dir_writer.sync();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
checkAndClose(src_writer);
|
|
checkAndClose(src_writer);
|
|
checkAndClose(dst_writer);
|
|
checkAndClose(dst_writer);
|
|
|
|
+ checkAndClose(dir_writer);
|
|
}
|
|
}
|
|
|
|
|
|
// create dest path dir if copying > 1 file
|
|
// create dest path dir if copying > 1 file
|
|
@@ -814,7 +970,7 @@ public class CopyFiles implements Tool {
|
|
throw new IOException("Failed to create" + destPath);
|
|
throw new IOException("Failed to create" + destPath);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
checkDuplication(jobfs, dstfilelist,
|
|
checkDuplication(jobfs, dstfilelist,
|
|
new Path(jobDirectory, "_distcp_sorted"), conf);
|
|
new Path(jobDirectory, "_distcp_sorted"), conf);
|
|
|
|
|