|
@@ -157,30 +157,39 @@ public class DistributedFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
|
|
|
|
private void doFromLocalFile(File src, File dst, boolean deleteSource) throws IOException {
|
|
private void doFromLocalFile(File src, File dst, boolean deleteSource) throws IOException {
|
|
- if (exists(dst)) {
|
|
|
|
- if (! isDirectory(dst)) {
|
|
|
|
|
|
+ FileSystem localFs = getNamed("local", getConf());
|
|
|
|
+ doCopy( localFs, src, this, dst, deleteSource, getConf() );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static void doCopy(FileSystem srcFS,
|
|
|
|
+ File src,
|
|
|
|
+ FileSystem dstFS,
|
|
|
|
+ File dst,
|
|
|
|
+ boolean deleteSource,
|
|
|
|
+ Configuration conf
|
|
|
|
+ ) throws IOException {
|
|
|
|
+ if (dstFS.exists(dst)) {
|
|
|
|
+ if (! dstFS.isDirectory(dst)) {
|
|
throw new IOException("Target " + dst + " already exists");
|
|
throw new IOException("Target " + dst + " already exists");
|
|
} else {
|
|
} else {
|
|
dst = new File(dst, src.getName());
|
|
dst = new File(dst, src.getName());
|
|
- if (exists(dst)) {
|
|
|
|
|
|
+ if (dstFS.exists(dst)) {
|
|
throw new IOException("Target " + dst + " already exists");
|
|
throw new IOException("Target " + dst + " already exists");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- FileSystem localFs = getNamed("local", getConf());
|
|
|
|
-
|
|
|
|
- if (localFs.isDirectory(src)) {
|
|
|
|
- mkdirs(dst);
|
|
|
|
- File contents[] = localFs.listFiles(src);
|
|
|
|
|
|
+ if (srcFS.isDirectory(src)) {
|
|
|
|
+ dstFS.mkdirs(dst);
|
|
|
|
+ File contents[] = srcFS.listFiles(src);
|
|
for (int i = 0; i < contents.length; i++) {
|
|
for (int i = 0; i < contents.length; i++) {
|
|
- doFromLocalFile(contents[i], new File(dst, contents[i].getName()), deleteSource);
|
|
|
|
|
|
+ doCopy( srcFS, contents[i], dstFS, new File(dst, contents[i].getName()), deleteSource, conf);
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];
|
|
|
|
- InputStream in = localFs.open(src);
|
|
|
|
|
|
+ byte buf[] = new byte[conf.getInt("io.file.buffer.size", 4096)];
|
|
|
|
+ InputStream in = srcFS.open(src);
|
|
try {
|
|
try {
|
|
- OutputStream out = create(dst);
|
|
|
|
|
|
+ OutputStream out = dstFS.create(dst);
|
|
try {
|
|
try {
|
|
int bytesRead = in.read(buf);
|
|
int bytesRead = in.read(buf);
|
|
while (bytesRead >= 0) {
|
|
while (bytesRead >= 0) {
|
|
@@ -195,48 +204,13 @@ public class DistributedFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (deleteSource)
|
|
if (deleteSource)
|
|
- localFs.delete(src);
|
|
|
|
|
|
+ srcFS.delete(src);
|
|
}
|
|
}
|
|
|
|
|
|
public void copyToLocalFile(File src, File dst) throws IOException {
|
|
public void copyToLocalFile(File src, File dst) throws IOException {
|
|
- if (dst.exists()) {
|
|
|
|
- if (! dst.isDirectory()) {
|
|
|
|
- throw new IOException("Target " + dst + " already exists");
|
|
|
|
- } else {
|
|
|
|
- dst = new File(dst, src.getName());
|
|
|
|
- if (dst.exists()) {
|
|
|
|
- throw new IOException("Target " + dst + " already exists");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
dst = dst.getCanonicalFile();
|
|
dst = dst.getCanonicalFile();
|
|
-
|
|
|
|
FileSystem localFs = getNamed("local", getConf());
|
|
FileSystem localFs = getNamed("local", getConf());
|
|
-
|
|
|
|
- if (isDirectory(src)) {
|
|
|
|
- localFs.mkdirs(dst);
|
|
|
|
- File contents[] = listFiles(src);
|
|
|
|
- for (int i = 0; i < contents.length; i++) {
|
|
|
|
- copyToLocalFile(contents[i], new File(dst, contents[i].getName()));
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];
|
|
|
|
- InputStream in = open(src);
|
|
|
|
- try {
|
|
|
|
- OutputStream out = localFs.create(dst);
|
|
|
|
- try {
|
|
|
|
- int bytesRead = in.read(buf);
|
|
|
|
- while (bytesRead >= 0) {
|
|
|
|
- out.write(buf, 0, bytesRead);
|
|
|
|
- bytesRead = in.read(buf);
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- out.close();
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- in.close();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ doCopy( this, src, localFs, dst, false, getConf() );
|
|
}
|
|
}
|
|
|
|
|
|
public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws IOException {
|
|
public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws IOException {
|