|
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.ParentNotDirectoryException;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
|
|
@@ -62,7 +63,7 @@ public class S3FileSystem extends FileSystem {
|
|
|
public S3FileSystem() {
|
|
|
// set store in initialize()
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public S3FileSystem(FileSystemStore store) {
|
|
|
this.store = store;
|
|
|
}
|
|
@@ -90,14 +91,14 @@ public class S3FileSystem extends FileSystem {
|
|
|
}
|
|
|
store.initialize(uri, conf);
|
|
|
setConf(conf);
|
|
|
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
|
|
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
|
|
this.workingDir =
|
|
|
new Path("/user", System.getProperty("user.name")).makeQualified(this);
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
private static FileSystemStore createDefaultStore(Configuration conf) {
|
|
|
FileSystemStore store = new Jets3tFileSystemStore();
|
|
|
-
|
|
|
+
|
|
|
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
|
|
conf.getInt("fs.s3.maxRetries", 4),
|
|
|
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
|
|
@@ -105,13 +106,13 @@ public class S3FileSystem extends FileSystem {
|
|
|
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
|
exceptionToPolicyMap.put(IOException.class, basePolicy);
|
|
|
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
|
|
|
-
|
|
|
+
|
|
|
RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
|
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
|
|
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
|
|
|
methodNameToPolicyMap.put("storeBlock", methodPolicy);
|
|
|
methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
|
|
|
-
|
|
|
+
|
|
|
return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
|
|
|
store, methodNameToPolicyMap);
|
|
|
}
|
|
@@ -144,21 +145,29 @@ public class S3FileSystem extends FileSystem {
|
|
|
paths.add(0, absolutePath);
|
|
|
absolutePath = absolutePath.getParent();
|
|
|
} while (absolutePath != null);
|
|
|
-
|
|
|
+
|
|
|
boolean result = true;
|
|
|
- for (Path p : paths) {
|
|
|
- result &= mkdir(p);
|
|
|
+ for (int i = 0; i < paths.size(); i++) {
|
|
|
+ Path p = paths.get(i);
|
|
|
+ try {
|
|
|
+ result &= mkdir(p);
|
|
|
+ } catch(FileAlreadyExistsException e) {
|
|
|
+ if (i + 1 < paths.size()) {
|
|
|
+ throw new ParentNotDirectoryException(e.getMessage());
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private boolean mkdir(Path path) throws IOException {
|
|
|
Path absolutePath = makeAbsolute(path);
|
|
|
INode inode = store.retrieveINode(absolutePath);
|
|
|
if (inode == null) {
|
|
|
store.storeINode(absolutePath, INode.DIRECTORY_INODE);
|
|
|
} else if (inode.isFile()) {
|
|
|
- throw new IOException(String.format(
|
|
|
+ throw new FileAlreadyExistsException(String.format(
|
|
|
"Can't make directory for path %s since it is a file.",
|
|
|
absolutePath));
|
|
|
}
|
|
@@ -176,11 +185,12 @@ public class S3FileSystem extends FileSystem {
|
|
|
|
|
|
private INode checkFile(Path path) throws IOException {
|
|
|
INode inode = store.retrieveINode(makeAbsolute(path));
|
|
|
+ String message = String.format("No such file: '%s'", path.toString());
|
|
|
if (inode == null) {
|
|
|
- throw new IOException("No such file.");
|
|
|
+ throw new FileNotFoundException(message + " does not exist");
|
|
|
}
|
|
|
if (inode.isDirectory()) {
|
|
|
- throw new IOException("Path " + path + " is a directory.");
|
|
|
+ throw new FileNotFoundException(message + " is a directory");
|
|
|
}
|
|
|
return inode;
|
|
|
}
|
|
@@ -222,10 +232,14 @@ public class S3FileSystem extends FileSystem {
|
|
|
|
|
|
INode inode = store.retrieveINode(makeAbsolute(file));
|
|
|
if (inode != null) {
|
|
|
- if (overwrite) {
|
|
|
+ if (overwrite && !inode.isDirectory()) {
|
|
|
delete(file, true);
|
|
|
} else {
|
|
|
- throw new FileAlreadyExistsException("File already exists: " + file);
|
|
|
+ String message = String.format("File already exists: '%s'", file);
|
|
|
+ if (inode.isDirectory()) {
|
|
|
+ message = message + " is a directory";
|
|
|
+ }
|
|
|
+ throw new FileAlreadyExistsException(message);
|
|
|
}
|
|
|
} else {
|
|
|
Path parent = file.getParent();
|
|
@@ -233,7 +247,7 @@ public class S3FileSystem extends FileSystem {
|
|
|
if (!mkdirs(parent)) {
|
|
|
throw new IOException("Mkdirs failed to create " + parent.toString());
|
|
|
}
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
return new FSDataOutputStream
|
|
|
(new S3OutputStream(getConf(), store, makeAbsolute(file),
|
|
@@ -259,7 +273,7 @@ public class S3FileSystem extends FileSystem {
|
|
|
if (debugEnabled) {
|
|
|
LOG.debug(debugPreamble + "returning false as src does not exist");
|
|
|
}
|
|
|
- return false;
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
Path absoluteDst = makeAbsolute(dst);
|
|
@@ -404,7 +418,7 @@ public class S3FileSystem extends FileSystem {
|
|
|
store.deleteBlock(block);
|
|
|
}
|
|
|
} else {
|
|
|
- FileStatus[] contents = null;
|
|
|
+ FileStatus[] contents = null;
|
|
|
try {
|
|
|
contents = listStatus(absolutePath);
|
|
|
} catch(FileNotFoundException fnfe) {
|
|
@@ -412,7 +426,7 @@ public class S3FileSystem extends FileSystem {
|
|
|
}
|
|
|
|
|
|
if ((contents.length !=0) && (!recursive)) {
|
|
|
- throw new IOException("Directory " + path.toString()
|
|
|
+ throw new IOException("Directory " + path.toString()
|
|
|
+ " is not empty.");
|
|
|
}
|
|
|
for (FileStatus p:contents) {
|
|
@@ -424,9 +438,9 @@ public class S3FileSystem extends FileSystem {
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * FileStatus for S3 file systems.
|
|
|
+ * FileStatus for S3 file systems.
|
|
|
*/
|
|
|
@Override
|
|
|
public FileStatus getFileStatus(Path f) throws IOException {
|
|
@@ -436,7 +450,7 @@ public class S3FileSystem extends FileSystem {
|
|
|
}
|
|
|
return new S3FileStatus(f.makeQualified(this), inode);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public long getDefaultBlockSize() {
|
|
|
return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
|