|
@@ -30,6 +30,7 @@ import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
|
import java.util.NoSuchElementException;
|
|
|
|
|
|
+import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FilterFileSystem;
|
|
@@ -45,6 +46,9 @@ import org.apache.hadoop.fs.permission.AclUtil;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|
|
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
|
|
+
|
|
|
/**
|
|
|
* Provides: argument processing to ensure the destination is valid
|
|
|
* for the number of source arguments. A processPaths that accepts both
|
|
@@ -56,6 +60,7 @@ abstract class CommandWithDestination extends FsCommand {
|
|
|
private boolean overwrite = false;
|
|
|
private boolean verifyChecksum = true;
|
|
|
private boolean writeChecksum = true;
|
|
|
+ private boolean lazyPersist = false;
|
|
|
|
|
|
/**
|
|
|
* The name of the raw xattr namespace. It would be nice to use
|
|
@@ -78,6 +83,10 @@ abstract class CommandWithDestination extends FsCommand {
|
|
|
overwrite = flag;
|
|
|
}
|
|
|
|
|
|
+ protected void setLazyPersist(boolean flag) {
|
|
|
+ lazyPersist = flag;
|
|
|
+ }
|
|
|
+
|
|
|
protected void setVerifyChecksum(boolean flag) {
|
|
|
verifyChecksum = flag;
|
|
|
}
|
|
@@ -379,7 +388,7 @@ abstract class CommandWithDestination extends FsCommand {
|
|
|
try {
|
|
|
PathData tempTarget = target.suffix("._COPYING_");
|
|
|
targetFs.setWriteChecksum(writeChecksum);
|
|
|
- targetFs.writeStreamToFile(in, tempTarget);
|
|
|
+ targetFs.writeStreamToFile(in, tempTarget, lazyPersist);
|
|
|
targetFs.rename(tempTarget, target);
|
|
|
} finally {
|
|
|
targetFs.close(); // last ditch effort to ensure temp file is removed
|
|
@@ -449,10 +458,11 @@ abstract class CommandWithDestination extends FsCommand {
|
|
|
super(fs);
|
|
|
}
|
|
|
|
|
|
- void writeStreamToFile(InputStream in, PathData target) throws IOException {
|
|
|
+ void writeStreamToFile(InputStream in, PathData target,
|
|
|
+ boolean lazyPersist) throws IOException {
|
|
|
FSDataOutputStream out = null;
|
|
|
try {
|
|
|
- out = create(target);
|
|
|
+ out = create(target, lazyPersist);
|
|
|
IOUtils.copyBytes(in, out, getConf(), true);
|
|
|
} finally {
|
|
|
IOUtils.closeStream(out); // just in case copyBytes didn't
|
|
@@ -460,9 +470,21 @@ abstract class CommandWithDestination extends FsCommand {
|
|
|
}
|
|
|
|
|
|
// tag created files as temp files
|
|
|
- FSDataOutputStream create(PathData item) throws IOException {
|
|
|
+ FSDataOutputStream create(PathData item, boolean lazyPersist)
|
|
|
+ throws IOException {
|
|
|
try {
|
|
|
- return create(item.path, true);
|
|
|
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
|
|
|
+ if (lazyPersist) {
|
|
|
+ createFlags.add(LAZY_PERSIST);
|
|
|
+ }
|
|
|
+ return create(item.path,
|
|
|
+ null,
|
|
|
+ createFlags,
|
|
|
+ getConf().getInt("io.file.buffer.size", 4096),
|
|
|
+ lazyPersist ? 1 : getDefaultReplication(item.path),
|
|
|
+ getDefaultBlockSize(),
|
|
|
+ null,
|
|
|
+ null);
|
|
|
} finally { // might have been created but stream was interrupted
|
|
|
deleteOnExit(item.path);
|
|
|
}
|