Browse Source

HADOOP-10400. Incorporate new S3A FileSystem implementation. Contributed by Jordan Mendelson and Dave Wang.

Aaron T. Myers 10 năm trước cách đây
mục cha
commit
a0c54aeb00
25 tập tin đã thay đổi với 2550 bổ sung9 xóa
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 5 0
      hadoop-common-project/hadoop-common/src/main/conf/log4j.properties
  3. 86 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  4. 8 0
      hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml
  5. 18 8
      hadoop-project/pom.xml
  6. 10 0
      hadoop-tools/hadoop-aws/pom.xml
  7. 37 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
  8. 51 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
  9. 90 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  10. 62 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
  11. 1019 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  12. 207 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
  13. 208 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
  14. 1 0
      hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
  15. 43 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
  16. 38 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java
  17. 31 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java
  18. 34 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java
  19. 31 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java
  20. 64 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java
  21. 35 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java
  22. 31 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractSeek.java
  23. 327 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3AFileSystemContractBaseTest.java
  24. 105 0
      hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml
  25. 6 1
      hadoop-tools/hadoop-aws/src/test/resources/contract/s3n.xml

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -11,6 +11,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-10893. isolated classloader on the client side (Sangjin Lee via
     jlowe)
 
+    HADOOP-10400. Incorporate new S3A FileSystem implementation. (Jordan
+    Mendelson and Dave Wang via atm)
+
   IMPROVEMENTS
 
     HADOOP-10808. Remove unused native code for munlock. (cnauroth)

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/conf/log4j.properties

@@ -174,6 +174,11 @@ log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}
 # Jets3t library
 log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
 
+# AWS SDK & S3A FileSystem
+log4j.logger.com.amazonaws=ERROR
+log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
+log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN
+
 #
 # Event Counter Appender
 # Sends counts of logging messages at different severity levels to Hadoop Metrics.

+ 86 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -681,6 +681,92 @@ for ldap providers in the same way as above does.
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.access.key</name>
+  <description>AWS access key ID. Omit for Role-based authentication.</description>
+</property>
+
+<property>
+  <name>fs.s3a.secret.key</name>
+  <description>AWS secret key. Omit for Role-based authentication.</description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.maximum</name>
+  <value>15</value>
+  <description>Controls the maximum number of simultaneous connections to S3.</description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.ssl.enabled</name>
+  <value>true</value>
+  <description>Enables or disables SSL connections to S3.</description>
+</property>
+
+<property>
+  <name>fs.s3a.attempts.maximum</name>
+  <value>10</value>
+  <description>How many times we should retry commands on transient errors.</description>
+</property>
+
+<property>
+  <name>fs.s3a.connection.timeout</name>
+  <value>5000</value>
+  <description>Socket connection timeout in seconds.</description>
+</property>
+
+<property>
+  <name>fs.s3a.paging.maximum</name>
+  <value>5000</value>
+  <description>How many keys to request from S3 when doing 
+     directory listings at a time.</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.size</name>
+  <value>104857600</value>
+  <description>How big (in bytes) to split upload or copy operations up into.</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.threshold</name>
+  <value>2147483647</value>
+  <description>Threshold before uploads or copies use parallel multipart operations.</description>
+</property>
+
+<property>
+  <name>fs.s3a.acl.default</name>
+  <description>Set a canned ACL for newly created and copied objects. Value may be private, 
+     public-read, public-read-write, authenticated-read, log-delivery-write, 
+     bucket-owner-read, or bucket-owner-full-control.</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.purge</name>
+  <value>false</value>
+  <description>True if you want to purge existing multipart uploads that may not have been
+     completed/aborted correctly</description>
+</property>
+
+<property>
+  <name>fs.s3a.multipart.purge.age</name>
+  <value>86400</value>
+  <description>Minimum age in seconds of multipart uploads to purge</description>
+</property>
+
+<property>
+  <name>fs.s3a.buffer.dir</name>
+  <value>${hadoop.tmp.dir}/s3a</value>
+  <description>Comma separated list of directories that will be used to buffer file 
+    uploads to.</description>
+</property>
+
+<property>
+  <name>fs.s3a.impl</name>
+  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+  <description>The implementation class of the S3A Filesystem</description>
+</property>
+
 <property>
   <name>io.seqfile.compress.blocksize</name>
   <value>1000000</value>

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml

@@ -146,6 +146,10 @@
           <groupId>net.java.dev.jets3t</groupId>
           <artifactId>jets3t</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk</artifactId>
+        </exclusion>
         <exclusion>
           <groupId>org.eclipse.jdt</groupId>
           <artifactId>core</artifactId>
@@ -209,6 +213,10 @@
           <groupId>net.java.dev.jets3t</groupId>
           <artifactId>jets3t</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk</artifactId>
+        </exclusion>
         <exclusion>
           <groupId>org.eclipse.jdt</groupId>
           <artifactId>core</artifactId>

+ 18 - 8
hadoop-project/pom.xml

@@ -61,8 +61,9 @@
     <!-- jersey version -->
     <jersey.version>1.9</jersey.version>
 
-    <!-- jackson version -->
+    <!-- jackson versions -->
     <jackson.version>1.9.13</jackson.version>
+    <jackson2.version>2.2.3</jackson2.version>
 
     <!-- ProtocolBuffer version, used to verify the protoc version and -->
     <!-- define the protobuf JAR version                               -->
@@ -604,13 +605,7 @@
       <dependency>
         <groupId>com.amazonaws</groupId>
         <artifactId>aws-java-sdk</artifactId>
-        <version>1.7.2</version>
-        <exclusions>
-          <exclusion>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-          </exclusion>
-        </exclusions>
+        <version>1.7.4</version>
       </dependency>
       <dependency>
         <groupId>org.apache.mina</groupId>
@@ -697,6 +692,21 @@
         <artifactId>jackson-xc</artifactId>
         <version>${jackson.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-core</artifactId>
+        <version>${jackson2.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${jackson2.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-annotations</artifactId>
+        <version>${jackson2.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-all</artifactId>

+ 10 - 0
hadoop-tools/hadoop-aws/pom.xml

@@ -100,6 +100,16 @@
       <type>test-jar</type>
     </dependency>
 
+    <!-- see ../../hadoop-project/pom.xml for versions -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-java-sdk</artifactId>

+ 37 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.auth.AWSCredentials;
+
+public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
+  public AWSCredentials getCredentials() {
+    return new AnonymousAWSCredentials();
+  }
+
+  public void refresh() {}
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

+ 51 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.AWSCredentials;
+
+public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
+  private String accessKey;
+  private String secretKey;
+
+  public BasicAWSCredentialsProvider(String accessKey, String secretKey) {
+    this.accessKey = accessKey;
+    this.secretKey = secretKey;
+  }
+
+  public AWSCredentials getCredentials() {
+    if (accessKey != null && secretKey != null) {
+      return new BasicAWSCredentials(accessKey, secretKey);
+    }
+
+    throw new AmazonClientException(
+        "Access key or secret key is null");
+  }
+
+  public void refresh() {}
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName();
+  }
+
+}

+ 90 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+
+public class Constants {
+  // s3 access key
+  public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId";
+  public static final String NEW_ACCESS_KEY = "fs.s3a.access.key";
+
+  // s3 secret key
+  public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey";
+  public static final String NEW_SECRET_KEY = "fs.s3a.secret.key";
+  
+  // number of simultaneous connections to s3
+  public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections";
+  public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
+  public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
+  
+  // connect to s3 over ssl?
+  public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections";
+  public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
+  public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
+  
+  // number of times we should retry errors
+  public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries";
+  public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
+  public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
+  
+  // seconds until we give up on a connection to s3
+  public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout";
+  public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
+  public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
+
+  // number of records to get while paging through a directory listing
+  public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys";
+  public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
+  public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
+
+  // size of each of or multipart pieces in bytes
+  public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize";
+  public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size";
+  public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
+  
+  // minimum size in bytes before we start a multipart uploads or copy
+  public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize";
+  public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
+  public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
+  
+  // comma separated list of directories
+  public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
+
+  // private | public-read | public-read-write | authenticated-read | 
+  // log-delivery-write | bucket-owner-read | bucket-owner-full-control
+  public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL";
+  public static final String NEW_CANNED_ACL = "fs.s3a.acl.default";
+  public static final String DEFAULT_CANNED_ACL = "";
+
+  // should we try to purge old multipart uploads when starting up
+  public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart";
+  public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
+  public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
+
+  // purge any multipart uploads older than this number of seconds
+  public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge";
+  public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
+  public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
+
+  // s3 server-side encryption
+  public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = 
+    "fs.s3a.server-side-encryption-algorithm";
+  
+  public static final String S3N_FOLDER_SUFFIX = "_$folder$";
+}

+ 62 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public class S3AFileStatus extends FileStatus {
+  private boolean isEmptyDirectory;
+
+  // Directories
+  public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) {
+    super(0, isdir, 1, 0, 0, path);
+    isEmptyDirectory = isemptydir;
+  }
+
+  // Files
+  public S3AFileStatus(long length, long modification_time, Path path) {
+    super(length, false, 1, 0, modification_time, path);
+    isEmptyDirectory = false;
+  }
+
+  public boolean isEmptyDirectory() {
+    return isEmptyDirectory;
+  }
+  
+  /** Compare if this object is equal to another object
+   * @param   o the object to be compared.
+   * @return  true if two file status has the same path name; false if not.
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  /**
+   * Returns a hash code value for the object, which is defined as
+   * the hash code of the path name.
+   *
+   * @return  a hash code value for the path name.
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+}

+ 1019 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -0,0 +1,1019 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.transfer.Copy;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
+import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.event.ProgressEvent;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3AFileSystem extends FileSystem {
+  private URI uri;
+  private Path workingDir;
+  private AmazonS3Client s3;
+  private String bucket;
+  private int maxKeys;
+  private long partSize;
+  private int partSizeThreshold;
+  public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
+  private CannedAccessControlList cannedACL;
+  private String serverSideEncryptionAlgorithm;
+
+
+  /** Called after a new FileSystem instance is constructed.
+   * @param name a uri whose authority section names the host, port, etc.
+   *   for this FileSystem
+   * @param conf the configuration
+   */
+  public void initialize(URI name, Configuration conf) throws IOException {
+    super.initialize(name, conf);
+
+
+    uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+    workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
+        this.getWorkingDirectory());
+
+    // Try to get our credentials or just connect anonymously
+    String accessKey = conf.get(NEW_ACCESS_KEY, conf.get(OLD_ACCESS_KEY, null));
+    String secretKey = conf.get(NEW_SECRET_KEY, conf.get(OLD_SECRET_KEY, null));
+
+    String userInfo = name.getUserInfo();
+    if (userInfo != null) {
+      int index = userInfo.indexOf(':');
+      if (index != -1) {
+        accessKey = userInfo.substring(0, index);
+        secretKey = userInfo.substring(index + 1);
+      } else {
+        accessKey = userInfo;
+      }
+    }
+
+    AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
+        new BasicAWSCredentialsProvider(accessKey, secretKey),
+        new InstanceProfileCredentialsProvider(),
+        new AnonymousAWSCredentialsProvider()
+    );
+
+    bucket = name.getHost();
+
+    ClientConfiguration awsConf = new ClientConfiguration();
+    awsConf.setMaxConnections(conf.getInt(NEW_MAXIMUM_CONNECTIONS, 
+      conf.getInt(OLD_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS)));
+    awsConf.setProtocol(conf.getBoolean(NEW_SECURE_CONNECTIONS, 
+      conf.getBoolean(OLD_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) ? 
+        Protocol.HTTPS : Protocol.HTTP);
+    awsConf.setMaxErrorRetry(conf.getInt(NEW_MAX_ERROR_RETRIES, 
+      conf.getInt(OLD_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES)));
+    awsConf.setSocketTimeout(conf.getInt(NEW_SOCKET_TIMEOUT, 
+      conf.getInt(OLD_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)));
+
+    s3 = new AmazonS3Client(credentials, awsConf);
+
+    maxKeys = conf.getInt(NEW_MAX_PAGING_KEYS, 
+      conf.getInt(OLD_MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS));
+    partSize = conf.getLong(NEW_MULTIPART_SIZE, 
+      conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
+    partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, 
+      conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
+
+    if (partSize < 5 * 1024 * 1024) {
+      LOG.error(NEW_MULTIPART_SIZE + " must be at least 5 MB");
+      partSize = 5 * 1024 * 1024;
+    }
+
+    if (partSizeThreshold < 5 * 1024 * 1024) {
+      LOG.error(NEW_MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
+      partSizeThreshold = 5 * 1024 * 1024;
+    }
+
+    String cannedACLName = conf.get(NEW_CANNED_ACL, 
+      conf.get(OLD_CANNED_ACL, DEFAULT_CANNED_ACL));
+    if (!cannedACLName.isEmpty()) {
+      cannedACL = CannedAccessControlList.valueOf(cannedACLName);
+    } else {
+      cannedACL = null;
+    }
+
+    if (!s3.doesBucketExist(bucket)) {
+      throw new IOException("Bucket " + bucket + " does not exist");
+    }
+
+    boolean purgeExistingMultipart = conf.getBoolean(NEW_PURGE_EXISTING_MULTIPART, 
+      conf.getBoolean(OLD_PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART));
+    long purgeExistingMultipartAge = conf.getLong(NEW_PURGE_EXISTING_MULTIPART_AGE, 
+      conf.getLong(OLD_PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE));
+
+    if (purgeExistingMultipart) {
+      TransferManager transferManager = new TransferManager(s3);
+      Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
+
+      transferManager.abortMultipartUploads(bucket, purgeBefore);
+      transferManager.shutdownNow(false);
+    }
+
+    serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+
+    setConf(conf);
+  }
+
+  /**
+   * Return the protocol scheme for the FileSystem.
+   *
+   * @return "s3a"
+   */
+  public String getScheme() {
+    return "s3a";
+  }
+
+  /** Returns a URI whose scheme and authority identify this FileSystem.*/
+  public URI getUri() {
+    return uri;
+  }
+
+
+  public S3AFileSystem() {
+    super();
+  }
+
+  /* Turns a path (relative or otherwise) into an S3 key
+   */
+  private String pathToKey(Path path) {
+    if (!path.isAbsolute()) {
+      path = new Path(workingDir, path);
+    }
+
+    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
+      return "";
+    }
+
+    return path.toUri().getPath().substring(1);
+  }
+
+  private Path keyToPath(String key) {
+    return new Path("/" + key);
+  }
+
+  /**
+   * Opens an FSDataInputStream at the indicated Path.
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   */
+  public FSDataInputStream open(Path f, int bufferSize)
+      throws IOException {
+
+    LOG.info("Opening '" + f + "' for reading");
+    final FileStatus fileStatus = getFileStatus(f);
+    if (fileStatus.isDirectory()) {
+      throw new FileNotFoundException("Can't open " + f + " because it is a directory");
+    }
+
+    return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), 
+      fileStatus.getLen(), s3, statistics));
+  }
+
+  /**
+   * Create an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   * @param f the file name to open
+   * @param permission
+   * @param overwrite if a file with this name already exists, then if true,
+   *   the file will be overwritten, and if false an error will be thrown.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file.
+   * @param blockSize
+   * @param progress
+   * @throws IOException
+   * @see #setPermission(Path, FsPermission)
+   */
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, 
+    int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+    String key = pathToKey(f);
+
+    if (!overwrite && exists(f)) {
+      throw new FileAlreadyExistsException(f + " already exists");
+    }
+
+    // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
+    return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this, 
+      bucket, key, progress, cannedACL, statistics, 
+      serverSideEncryptionAlgorithm), null);
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   * @param f the existing file to be appended.
+   * @param bufferSize the size of the buffer to be used.
+   * @param progress for reporting progress if it is not null.
+   * @throws IOException
+   */
+  public FSDataOutputStream append(Path f, int bufferSize, 
+    Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+
+  /**
+   * Renames Path src to Path dst.  Can take place on local fs
+   * or remote DFS.
+   *
+   * Warning: S3 does not support renames. This method does a copy which can 
+   * take S3 some time to execute with large files and directories. Since 
+   * there is no Progressable passed in, this can time out jobs.
+   *
+   * Note: This implementation differs with other S3 drivers. Specifically:
+   *       Fails if src is a file and dst is a directory.
+   *       Fails if src is a directory and dst is a file.
+   *       Fails if the parent of dst does not exist or is a file.
+   *       Fails if dst is a directory that is not empty.
+   *
+   * @param src path to be renamed
+   * @param dst new path after rename
+   * @throws IOException on failure
+   * @return true if rename is successful
+   */
+  public boolean rename(Path src, Path dst) throws IOException {
+    LOG.info("Rename path " + src + " to " + dst);
+
+    String srcKey = pathToKey(src);
+    String dstKey = pathToKey(dst);
+
+    if (srcKey.length() == 0 || dstKey.length() == 0) {
+      LOG.info("rename: src or dst are empty");
+      return false;
+    }
+
+    if (srcKey.equals(dstKey)) {
+      LOG.info("rename: src and dst refer to the same file");
+      return true;
+    }
+
+    S3AFileStatus srcStatus;
+    try {
+      srcStatus = getFileStatus(src);
+    } catch (FileNotFoundException e) {
+      LOG.info("rename: src not found " + src);
+      return false;
+    }
+
+    S3AFileStatus dstStatus = null;
+    try {
+      dstStatus = getFileStatus(dst);
+
+      if (srcStatus.isFile() && dstStatus.isDirectory()) {
+        LOG.info("rename: src is a file and dst is a directory");
+        return false;
+      }
+
+      if (srcStatus.isDirectory() && dstStatus.isFile()) {
+        LOG.info("rename: src is a directory and dst is a file");
+        return false;
+      }
+
+    } catch (FileNotFoundException e) {
+      // Parent must exist
+      Path parent = dst.getParent();
+      if (!pathToKey(parent).isEmpty()) {
+        try {
+          S3AFileStatus dstParentStatus = getFileStatus(dst.getParent());
+          if (!dstParentStatus.isDirectory()) {
+            return false;
+          }
+        } catch (FileNotFoundException e2) {
+          return false;
+        }
+      }
+    }
+
+    // Ok! Time to start
+    if (srcStatus.isFile()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("rename: renaming file " + src + " to " + dst);
+      }
+      copyFile(srcKey, dstKey);
+      delete(src, false);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("rename: renaming directory " + src + " to " + dst);
+      }
+
+      // This is a directory to directory copy
+      if (!dstKey.endsWith("/")) {
+        dstKey = dstKey + "/";
+      }
+
+      if (!srcKey.endsWith("/")) {
+        srcKey = srcKey + "/";
+      }
+
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = 
+        new ArrayList<DeleteObjectsRequest.KeyVersion>();
+      if (dstStatus != null && dstStatus.isEmptyDirectory()) {
+        copyFile(srcKey, dstKey);
+        statistics.incrementWriteOps(1);
+        keysToDelete.add(new DeleteObjectsRequest.KeyVersion(srcKey));
+      }
+
+      ListObjectsRequest request = new ListObjectsRequest();
+      request.setBucketName(bucket);
+      request.setPrefix(srcKey);
+      request.setMaxKeys(maxKeys);
+
+      ObjectListing objects = s3.listObjects(request);
+      statistics.incrementReadOps(1);
+
+      while (true) {
+        for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+          keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
+          String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
+          copyFile(summary.getKey(), newDstKey);
+        }
+
+        if (objects.isTruncated()) {
+          objects = s3.listNextBatchOfObjects(objects);
+          statistics.incrementReadOps(1);
+        } else {
+          break;
+        }
+      }
+
+
+      if (!keysToDelete.isEmpty()) {
+        DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
+        deleteRequest.setKeys(keysToDelete);
+        s3.deleteObjects(deleteRequest);
+        statistics.incrementWriteOps(1);
+      }
+    }
+
+    if (src.getParent() != dst.getParent()) {
+      deleteUnnecessaryFakeDirectories(dst.getParent());
+      createFakeDirectoryIfNecessary(src.getParent());
+    }
+    return true;
+  }
+
+  /** Delete a file.
+   *
+   * @param f the path to delete.
+   * @param recursive if path is a directory and set to
+   * true, the directory is deleted else throws an exception. In
+   * case of a file the recursive can be set to either true or false.
+   * @return  true if delete is successful else false.
+   * @throws IOException
+   */
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    LOG.info("Delete path " + f + " - recursive " + recursive);
+    S3AFileStatus status;
+    try {
+      status = getFileStatus(f);
+    } catch (FileNotFoundException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Couldn't delete " + f + " - does not exist");
+      }
+      return false;
+    }
+
+    String key = pathToKey(f);
+
+    if (status.isDirectory()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("delete: Path is a directory");
+      }
+
+      if (!recursive && !status.isEmptyDirectory()) {
+        throw new IOException("Path is a folder: " + f + 
+                              " and it is not an empty directory");
+      }
+
+      if (!key.endsWith("/")) {
+        key = key + "/";
+      }
+
+      if (key.equals("/")) {
+        LOG.info("s3a cannot delete the root directory");
+        return false;
+      }
+
+      if (status.isEmptyDirectory()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Deleting fake empty directory");
+        }
+        s3.deleteObject(bucket, key);
+        statistics.incrementWriteOps(1);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Getting objects for directory prefix " + key + " to delete");
+        }
+
+        ListObjectsRequest request = new ListObjectsRequest();
+        request.setBucketName(bucket);
+        request.setPrefix(key);
+        // Hopefully not setting a delimiter will cause this to find everything
+        //request.setDelimiter("/");
+        request.setMaxKeys(maxKeys);
+
+        List<DeleteObjectsRequest.KeyVersion> keys = 
+          new ArrayList<DeleteObjectsRequest.KeyVersion>();
+        ObjectListing objects = s3.listObjects(request);
+        statistics.incrementReadOps(1);
+        while (true) {
+          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+            keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Got object to delete " + summary.getKey());
+            }
+          }
+
+          DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket);
+          deleteRequest.setKeys(keys);
+          s3.deleteObjects(deleteRequest);
+          statistics.incrementWriteOps(1);
+          keys.clear();
+
+          if (objects.isTruncated()) {
+            objects = s3.listNextBatchOfObjects(objects);
+            statistics.incrementReadOps(1);
+          } else {
+            break;
+          }
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("delete: Path is a file");
+      }
+      s3.deleteObject(bucket, key);
+      statistics.incrementWriteOps(1);
+    }
+
+    createFakeDirectoryIfNecessary(f.getParent());
+
+    return true;
+  }
+
+  private void createFakeDirectoryIfNecessary(Path f) throws IOException {
+    String key = pathToKey(f);
+    if (!key.isEmpty() && !exists(f)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Creating new fake directory at " + f);
+      }
+      createFakeDirectory(bucket, key);
+    }
+  }
+
+  /**
+   * List the statuses of the files/directories in the given path if the path is
+   * a directory.
+   *
+   * @param f given path
+   * @return the statuses of the files/directories in the given patch
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+      IOException {
+    String key = pathToKey(f);
+    LOG.info("List status for path: " + f);
+
+    final List<FileStatus> result = new ArrayList<FileStatus>();
+    final FileStatus fileStatus =  getFileStatus(f);
+
+    if (fileStatus.isDirectory()) {
+      if (!key.isEmpty()) {
+        key = key + "/";
+      }
+
+      ListObjectsRequest request = new ListObjectsRequest();
+      request.setBucketName(bucket);
+      request.setPrefix(key);
+      request.setDelimiter("/");
+      request.setMaxKeys(maxKeys);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("listStatus: doing listObjects for directory " + key);
+      }
+
+      ObjectListing objects = s3.listObjects(request);
+      statistics.incrementReadOps(1);
+
+      while (true) {
+        for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+          Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
+          // Skip over keys that are ourselves and old S3N _$folder$ files
+          if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Ignoring: " + keyPath);
+            }
+            continue;
+          }
+
+          if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+            result.add(new S3AFileStatus(true, true, keyPath));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding: fd: " + keyPath);
+            }
+          } else {
+            result.add(new S3AFileStatus(summary.getSize(), 
+              dateToLong(summary.getLastModified()), keyPath));
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Adding: fi: " + keyPath);
+            }
+          }
+        }
+
+        for (String prefix : objects.getCommonPrefixes()) {
+          Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
+          if (keyPath.equals(f)) {
+            continue;
+          }
+          result.add(new S3AFileStatus(true, false, keyPath));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding: rd: " + keyPath);
+          }
+        }
+
+        if (objects.isTruncated()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("listStatus: list truncated - getting next batch");
+          }
+
+          objects = s3.listNextBatchOfObjects(objects);
+          statistics.incrementReadOps(1);
+        } else {
+          break;
+        }
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Adding: rd (not a dir): " + f);
+      }
+      result.add(fileStatus);
+    }
+
+    return result.toArray(new FileStatus[result.size()]);
+  }
+
+
+
+  /**
+   * Set the current working directory for the given file system. All relative
+   * paths will be resolved relative to it.
+   *
+   * @param new_dir
+   */
+  public void setWorkingDirectory(Path new_dir) {
+    workingDir = new_dir;
+  }
+
+  /**
+   * Get the current working directory for the given file system
+   * @return the directory pathname
+   */
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  /**
+   * Make the given file and all non-existent parents into
+   * directories. Has the semantics of Unix 'mkdir -p'.
+   * Existence of the directory hierarchy is not an error.
+   * @param f path to create
+   * @param permission to apply to f
+   */
+  // TODO: If we have created an empty file at /foo/bar and we then call 
+  // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    LOG.info("Making directory: " + f);
+
+    try {
+      FileStatus fileStatus = getFileStatus(f);
+
+      if (fileStatus.isDirectory()) {
+        return true;
+      } else {
+        throw new FileAlreadyExistsException("Path is a file: " + f);
+      }
+    } catch (FileNotFoundException e) {
+      Path fPart = f;
+      do {
+        try {
+          FileStatus fileStatus = getFileStatus(fPart);
+          if (fileStatus.isFile()) {
+            throw new FileAlreadyExistsException(String.format(
+                "Can't make directory for path '%s' since it is a file.", 
+                fPart));
+          }
+        } catch (FileNotFoundException fnfe) {
+        }
+        fPart = fPart.getParent();
+      } while (fPart != null);
+
+      String key = pathToKey(f);
+      createFakeDirectory(bucket, key);
+      return true;
+    }
+  }
+
+  /**
+   * Return a file status object that represents the path.
+   * @param f The path we want information from
+   * @return a FileStatus object
+   * @throws java.io.FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  public S3AFileStatus getFileStatus(Path f) throws IOException {
+    String key = pathToKey(f);
+
+    LOG.info("Getting path status for " + f + " (" + key + ")");
+
+    if (!key.isEmpty()) {
+      try {
+        ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
+        statistics.incrementReadOps(1);
+
+        if (objectRepresentsDirectory(key, meta.getContentLength())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found exact file: fake directory");
+          }
+          return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found exact file: normal file");
+          }
+          return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()),
+              f.makeQualified(uri, workingDir));
+        }
+      } catch (AmazonServiceException e) {
+        if (e.getStatusCode() != 404) {
+          printAmazonServiceException(e);
+          throw e;
+        }
+      } catch (AmazonClientException e) {
+        printAmazonClientException(e);
+        throw e;
+      }
+
+      // Necessary?
+      if (!key.endsWith("/")) {
+        try {
+          String newKey = key + "/";
+          ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
+          statistics.incrementReadOps(1);
+
+          if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Found file (with /): fake directory");
+            }
+            return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
+          } else {
+            LOG.warn("Found file (with /): real file? should not happen: " + key);
+
+            return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()),
+                f.makeQualified(uri, workingDir));
+          }
+        } catch (AmazonServiceException e) {
+          if (e.getStatusCode() != 404) {
+            printAmazonServiceException(e);
+            throw e;
+          }
+        } catch (AmazonClientException e) {
+          printAmazonClientException(e);
+          throw e;
+        }
+      }
+    }
+
+    try {
+      if (!key.isEmpty() && !key.endsWith("/")) {
+        key = key + "/";
+      }
+      ListObjectsRequest request = new ListObjectsRequest();
+      request.setBucketName(bucket);
+      request.setPrefix(key);
+      request.setDelimiter("/");
+      request.setMaxKeys(1);
+
+      ObjectListing objects = s3.listObjects(request);
+      statistics.incrementReadOps(1);
+
+      if (objects.getCommonPrefixes().size() > 0 || objects.getObjectSummaries().size() > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found path as directory (with /): " + 
+            objects.getCommonPrefixes().size() + "/" + 
+            objects.getObjectSummaries().size());
+
+          for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+            LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
+          }
+          for (String prefix : objects.getCommonPrefixes()) {
+            LOG.debug("Prefix: " + prefix);
+          }
+        }
+
+        return new S3AFileStatus(true, false, f.makeQualified(uri, workingDir));
+      }
+    } catch (AmazonServiceException e) {
+      if (e.getStatusCode() != 404) {
+        printAmazonServiceException(e);
+        throw e;
+      }
+    } catch (AmazonClientException e) {
+      printAmazonClientException(e);
+      throw e;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Not Found: " + f);
+    }
+    throw new FileNotFoundException("No such file or directory: " + f);
+  }
+
+  /**
+   * The src file is on the local disk.  Add it to FS at
+   * the given dst name.
+   *
+   * This version doesn't need to create a temporary file to calculate the md5.
+   * Sadly this doesn't seem to be used by the shell cp :(
+   *
+   * delSrc indicates if the source should be removed
+   * @param delSrc whether to delete the src
+   * @param overwrite whether to overwrite an existing file
+   * @param src path
+   * @param dst path
+   */
+  @Override
+  public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, 
+    Path dst) throws IOException {
+    String key = pathToKey(dst);
+
+    if (!overwrite && exists(dst)) {
+      throw new IOException(dst + " already exists");
+    }
+
+    LOG.info("Copying local file from " + src + " to " + dst);
+
+    // Since we have a local file, we don't need to stream into a temporary file
+    LocalFileSystem local = getLocal(getConf());
+    File srcfile = local.pathToFile(src);
+
+    TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
+    transferConfiguration.setMinimumUploadPartSize(partSize);
+    transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+
+    TransferManager transfers = new TransferManager(s3);
+    transfers.setConfiguration(transferConfiguration);
+
+    final ObjectMetadata om = new ObjectMetadata();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
+    putObjectRequest.setCannedAcl(cannedACL);
+    putObjectRequest.setMetadata(om);
+
+    ProgressListener progressListener = new ProgressListener() {
+      public void progressChanged(ProgressEvent progressEvent) {
+        switch (progressEvent.getEventCode()) {
+          case ProgressEvent.PART_COMPLETED_EVENT_CODE:
+            statistics.incrementWriteOps(1);
+            break;
+        }
+      }
+    };
+
+    Upload up = transfers.upload(putObjectRequest);
+    up.addProgressListener(progressListener);
+    try {
+      up.waitForUploadResult();
+      statistics.incrementWriteOps(1);
+    } catch (InterruptedException e) {
+      throw new IOException("Got interrupted, cancelling");
+    } finally {
+      transfers.shutdownNow(false);
+    }
+
+    // This will delete unnecessary fake parent directories
+    finishedWrite(key);
+
+    if (delSrc) {
+      local.delete(src, false);
+    }
+  }
+
+  /**
+  * Override getCononicalServiceName because we don't support token in S3A
+  */
+  @Override
+  public String getCanonicalServiceName() {
+    // Does not support Token
+    return null;
+  }
+
+  private void copyFile(String srcKey, String dstKey) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("copyFile " + srcKey + " -> " + dstKey);
+    }
+
+    TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
+    transferConfiguration.setMultipartCopyPartSize(partSize);
+
+    TransferManager transfers = new TransferManager(s3);
+    transfers.setConfiguration(transferConfiguration);
+
+    ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
+    final ObjectMetadata dstom = srcom.clone();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      dstom.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
+    copyObjectRequest.setCannedAccessControlList(cannedACL);
+    copyObjectRequest.setNewObjectMetadata(dstom);
+
+    ProgressListener progressListener = new ProgressListener() {
+      public void progressChanged(ProgressEvent progressEvent) {
+        switch (progressEvent.getEventCode()) {
+          case ProgressEvent.PART_COMPLETED_EVENT_CODE:
+            statistics.incrementWriteOps(1);
+            break;
+        }
+      }
+    };
+
+    Copy copy = transfers.copy(copyObjectRequest);
+    copy.addProgressListener(progressListener);
+    try {
+      copy.waitForCopyResult();
+      statistics.incrementWriteOps(1);
+    } catch (InterruptedException e) {
+      throw new IOException("Got interrupted, cancelling");
+    } finally {
+      transfers.shutdownNow(false);
+    }
+  }
+
+  private boolean objectRepresentsDirectory(final String name, final long size) {
+    return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L;
+  }
+
+  // Handles null Dates that can be returned by AWS
+  private static long dateToLong(final Date date) {
+    if (date == null) {
+      return 0L;
+    }
+
+    return date.getTime();
+  }
+
+  public void finishedWrite(String key) throws IOException {
+    deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
+  }
+
+  private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
+    while (true) {
+      try {
+        String key = pathToKey(f);
+        if (key.isEmpty()) {
+          break;
+        }
+
+        S3AFileStatus status = getFileStatus(f);
+
+        if (status.isDirectory() && status.isEmptyDirectory()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting fake directory " + key + "/");
+          }
+          s3.deleteObject(bucket, key + "/");
+          statistics.incrementWriteOps(1);
+        }
+      } catch (FileNotFoundException e) {
+      } catch (AmazonServiceException e) {}
+
+      if (f.isRoot()) {
+        break;
+      }
+
+      f = f.getParent();
+    }
+  }
+
+
+  private void createFakeDirectory(final String bucketName, final String objectName)
+      throws AmazonClientException, AmazonServiceException {
+    if (!objectName.endsWith("/")) {
+      createEmptyObject(bucketName, objectName + "/");
+    } else {
+      createEmptyObject(bucketName, objectName);
+    }
+  }
+
+  // Used to create an empty file that represents an empty directory
+  private void createEmptyObject(final String bucketName, final String objectName)
+      throws AmazonClientException, AmazonServiceException {
+    final InputStream im = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+
+    final ObjectMetadata om = new ObjectMetadata();
+    om.setContentLength(0L);
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om);
+    putObjectRequest.setCannedAcl(cannedACL);
+    s3.putObject(putObjectRequest);
+    statistics.incrementWriteOps(1);
+  }
+
+  /**
+   * Return the number of bytes that large input files should be optimally
+   * be split into to minimize i/o time.
+   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
+   */
+  @Deprecated
+  public long getDefaultBlockSize() {
+    // default to 32MB: large enough to minimize the impact of seeks
+    return getConf().getLong("fs.s3a.block.size", 32 * 1024 * 1024);
+  }
+
+  private void printAmazonServiceException(AmazonServiceException ase) {
+    LOG.info("Caught an AmazonServiceException, which means your request made it " +
+        "to Amazon S3, but was rejected with an error response for some reason.");
+    LOG.info("Error Message: " + ase.getMessage());
+    LOG.info("HTTP Status Code: " + ase.getStatusCode());
+    LOG.info("AWS Error Code: " + ase.getErrorCode());
+    LOG.info("Error Type: " + ase.getErrorType());
+    LOG.info("Request ID: " + ase.getRequestId());
+    LOG.info("Class Name: " + ase.getClass().getName());
+  }
+
+  private void printAmazonClientException(AmazonClientException ace) {
+    LOG.info("Caught an AmazonClientException, which means the client encountered " +
+        "a serious internal problem while trying to communicate with S3, " +
+        "such as not being able to access the network.");
+    LOG.info("Error Message: " + ace.getMessage());
+  }
+}

+ 207 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.slf4j.Logger;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.net.SocketException;
+
+public class S3AInputStream extends FSInputStream {
+  private long pos;
+  private boolean closed;
+  private S3ObjectInputStream wrappedStream;
+  private S3Object wrappedObject;
+  private FileSystem.Statistics stats;
+  private AmazonS3Client client;
+  private String bucket;
+  private String key;
+  private long contentLength;
+  public static final Logger LOG = S3AFileSystem.LOG;
+
+
+  public S3AInputStream(String bucket, String key, long contentLength, AmazonS3Client client,
+                        FileSystem.Statistics stats) {
+    this.bucket = bucket;
+    this.key = key;
+    this.contentLength = contentLength;
+    this.client = client;
+    this.stats = stats;
+    this.pos = 0;
+    this.closed = false;
+    this.wrappedObject = null;
+    this.wrappedStream = null;
+  }
+
+  private void openIfNeeded() throws IOException {
+    if (wrappedObject == null) {
+      reopen(0);
+    }
+  }
+
+  private synchronized void reopen(long pos) throws IOException {
+    if (wrappedStream != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Aborting old stream to open at pos " + pos);
+      }
+      wrappedStream.abort();
+    }
+
+    if (pos < 0) {
+      throw new EOFException("Trying to seek to a negative offset " + pos);
+    }
+
+    if (contentLength > 0 && pos > contentLength-1) {
+      throw new EOFException("Trying to seek to an offset " + pos + 
+                             " past the end of the file");
+    }
+
+    LOG.info("Actually opening file " + key + " at pos " + pos);
+
+    GetObjectRequest request = new GetObjectRequest(bucket, key);
+    request.setRange(pos, contentLength-1);
+
+    wrappedObject = client.getObject(request);
+    wrappedStream = wrappedObject.getObjectContent();
+
+    if (wrappedStream == null) {
+      throw new IOException("Null IO stream");
+    }
+
+    this.pos = pos;
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (this.pos == pos) {
+      return;
+    }
+
+    LOG.info("Reopening " + this.key + " to seek to new offset " + (pos - this.pos));
+    reopen(pos);
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    openIfNeeded();
+
+    int byteRead;
+    try {
+      byteRead = wrappedStream.read();
+    } catch (SocketTimeoutException e) {
+      LOG.info("Got timeout while trying to read from stream, trying to recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read();
+    } catch (SocketException e) {
+      LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read();
+    }
+
+    if (byteRead >= 0) {
+      pos++;
+    }
+
+    if (stats != null && byteRead >= 0) {
+      stats.incrementBytesRead(1);
+    }
+    return byteRead;
+  }
+
+  @Override
+  public synchronized int read(byte buf[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    openIfNeeded();
+
+    int byteRead;
+    try {
+      byteRead = wrappedStream.read(buf, off, len);
+    } catch (SocketTimeoutException e) {
+      LOG.info("Got timeout while trying to read from stream, trying to recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read(buf, off, len);
+    } catch (SocketException e) {
+      LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
+      reopen(pos);
+      byteRead = wrappedStream.read(buf, off, len);
+    }
+
+    if (byteRead > 0) {
+      pos += byteRead;
+    }
+
+    if (stats != null && byteRead > 0) {
+      stats.incrementBytesRead(byteRead);
+    }
+
+    return byteRead;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    super.close();
+    closed = true;
+    if (wrappedObject != null) {
+      wrappedObject.close();
+    }
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    long remaining = this.contentLength - this.pos;
+    if (remaining > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    }
+    return (int)remaining;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+}

+ 208 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java

@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
+import com.amazonaws.services.s3.transfer.Upload;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.util.Progressable;
+
+import org.slf4j.Logger;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+public class S3AOutputStream extends OutputStream {
+  private OutputStream backupStream;
+  private File backupFile;
+  private boolean closed;
+  private String key;
+  private String bucket;
+  private AmazonS3Client client;
+  private Progressable progress;
+  private long partSize;
+  private int partSizeThreshold;
+  private S3AFileSystem fs;
+  private CannedAccessControlList cannedACL;
+  private FileSystem.Statistics statistics;
+  private LocalDirAllocator lDirAlloc;
+  private String serverSideEncryptionAlgorithm;
+
+  public static final Logger LOG = S3AFileSystem.LOG;
+
+  public S3AOutputStream(Configuration conf, AmazonS3Client client, 
+    S3AFileSystem fs, String bucket, String key, Progressable progress, 
+    CannedAccessControlList cannedACL, FileSystem.Statistics statistics, 
+    String serverSideEncryptionAlgorithm)
+      throws IOException {
+    this.bucket = bucket;
+    this.key = key;
+    this.client = client;
+    this.progress = progress;
+    this.fs = fs;
+    this.cannedACL = cannedACL;
+    this.statistics = statistics;
+    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+
+    partSize = conf.getLong(NEW_MULTIPART_SIZE, 
+      conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
+    partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, 
+      conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
+
+    if (conf.get(BUFFER_DIR, null) != null) {
+      lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
+    } else {
+      lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
+    }
+
+    backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
+    closed = false;
+
+    LOG.info("OutputStream for key '" + key + "' writing to tempfile: " + this.backupFile);
+
+    this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
+  }
+
+  @Override
+  public void flush() throws IOException {
+    backupStream.flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    backupStream.close();
+    LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
+    LOG.info("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
+
+
+    try {
+      TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
+      transferConfiguration.setMinimumUploadPartSize(partSize);
+      transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+
+      TransferManager transfers = new TransferManager(client);
+      transfers.setConfiguration(transferConfiguration);
+
+      final ObjectMetadata om = new ObjectMetadata();
+      if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+        om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+      }
+      PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile);
+      putObjectRequest.setCannedAcl(cannedACL);
+      putObjectRequest.setMetadata(om);
+
+      Upload upload = transfers.upload(putObjectRequest);
+
+      ProgressableProgressListener listener = 
+        new ProgressableProgressListener(upload, progress, statistics);
+      upload.addProgressListener(listener);
+
+      upload.waitForUploadResult();
+
+      long delta = upload.getProgress().getBytesTransferred() - listener.getLastBytesTransferred();
+      if (statistics != null && delta != 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("S3A write delta changed after finished: " + delta + " bytes");
+        }
+        statistics.incrementBytesWritten(delta);
+      }
+
+      // This will delete unnecessary fake parent directories
+      fs.finishedWrite(key);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      if (!backupFile.delete()) {
+        LOG.warn("Could not delete temporary s3a file: " + backupFile);
+      }
+      super.close();
+      closed = true;
+    }
+
+    LOG.info("OutputStream for key '" + key + "' upload complete");
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    backupStream.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    backupStream.write(b, off, len);
+  }
+
+  public static class ProgressableProgressListener implements ProgressListener {
+    private Progressable progress;
+    private FileSystem.Statistics statistics;
+    private long lastBytesTransferred;
+    private Upload upload;
+
+    public ProgressableProgressListener(Upload upload, Progressable progress, 
+      FileSystem.Statistics statistics) {
+      this.upload = upload;
+      this.progress = progress;
+      this.statistics = statistics;
+      this.lastBytesTransferred = 0;
+    }
+
+    public void progressChanged(ProgressEvent progressEvent) {
+      if (progress != null) {
+        progress.progress();
+      }
+
+      // There are 3 http ops here, but this should be close enough for now
+      if (progressEvent.getEventCode() == ProgressEvent.PART_STARTED_EVENT_CODE ||
+          progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) {
+        statistics.incrementWriteOps(1);
+      }
+
+      long transferred = upload.getProgress().getBytesTransferred();
+      long delta = transferred - lastBytesTransferred;
+      if (statistics != null && delta != 0) {
+        statistics.incrementBytesWritten(delta);
+      }
+
+      lastBytesTransferred = transferred;
+    }
+
+    public long getLastBytesTransferred() {
+      return lastBytesTransferred;
+    }
+  }
+}

+ 1 - 0
hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem

@@ -15,3 +15,4 @@
 
 org.apache.hadoop.fs.s3.S3FileSystem
 org.apache.hadoop.fs.s3native.NativeS3FileSystem
+org.apache.hadoop.fs.s3a.S3AFileSystem

+ 43 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
+
+/**
+ * The contract of S3A: only enabled if the test bucket is provided
+ */
+public class S3AContract extends AbstractBondedFSContract {
+
+  public static final String CONTRACT_XML = "contract/s3a.xml";
+
+
+  public S3AContract(Configuration conf) {
+    super(conf);
+    //insert the base features
+    addConfResource(CONTRACT_XML);
+  }
+
+  @Override
+  public String getScheme() {
+    return "s3a";
+  }
+
+}

+ 38 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractCreate.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+public class TestS3AContractCreate extends AbstractContractCreateTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public void testOverwriteEmptyDirectory() throws Throwable {
+    ContractTestUtils.skip(
+        "blobstores can't distinguish empty directories from files");
+  }
+}

+ 31 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractDelete.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestS3AContractDelete extends AbstractContractDeleteTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

+ 34 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractMkdir.java

@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test dir operations on S3
+ */
+public class TestS3AContractMkdir extends AbstractContractMkdirTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

+ 31 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractOpen.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestS3AContractOpen extends AbstractContractOpenTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

+ 64 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRename.java

@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+
+public class TestS3AContractRename extends AbstractContractRenameTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public void testRenameDirIntoExistingDir() throws Throwable {
+    describe("Verify renaming a dir into an existing dir puts the files"
+             +" from the source dir into the existing dir"
+             +" and leaves existing files alone");
+    FileSystem fs = getFileSystem();
+    String sourceSubdir = "source";
+    Path srcDir = path(sourceSubdir);
+    Path srcFilePath = new Path(srcDir, "source-256.txt");
+    byte[] srcDataset = dataset(256, 'a', 'z');
+    writeDataset(fs, srcFilePath, srcDataset, srcDataset.length, 1024, false);
+    Path destDir = path("dest");
+
+    Path destFilePath = new Path(destDir, "dest-512.txt");
+    byte[] destDateset = dataset(512, 'A', 'Z');
+    writeDataset(fs, destFilePath, destDateset, destDateset.length, 1024, false);
+    assertIsFile(destFilePath);
+
+    boolean rename = fs.rename(srcDir, destDir);
+    Path renamedSrcFilePath = new Path(destDir, "source-256.txt");
+    assertIsFile(destFilePath);
+    assertIsFile(renamedSrcFilePath);
+    ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
+    assertTrue("rename returned false though the contents were copied", rename);
+  }
+}

+ 35 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractRootDir.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * root dir operations against an S3 bucket
+ */
+public class TestS3AContractRootDir extends
+    AbstractContractRootDirectoryTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

+ 31 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractSeek.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestS3AContractSeek extends AbstractContractSeekTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+}

+ 327 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3AFileSystemContractBaseTest.java

@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.junit.Assume.*;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
+/**
+ *  Tests a live S3 system. If you keys and bucket aren't specified, all tests 
+ *  are marked as passed 
+ *  
+ *  This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from 
+ *  TestCase which uses the old Junit3 runner that doesn't ignore assumptions 
+ *  properly making it impossible to skip the tests if we don't have a valid
+ *  bucket.
+ **/
+public class S3AFileSystemContractBaseTest extends FileSystemContractBaseTest {
+  private static final int TEST_BUFFER_SIZE = 128;
+  private static final int MODULUS = 128;
+
+  protected static final Logger LOG = LoggerFactory.getLogger(S3AFileSystemContractBaseTest.class);
+
+  @Override
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+
+    URI testURI = URI.create(conf.get("test.fs.s3a.name"));
+    
+    boolean liveTest = testURI != null && !testURI.equals("s3a:///");
+    
+    // This doesn't work with our JUnit 3 style test cases, so instead we'll 
+    // make this whole class not run by default
+    assumeTrue(liveTest);
+    
+    fs = new S3AFileSystem();
+    fs.initialize(testURI, conf);
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+	if (fs != null) {
+	  fs.delete(path("/tests3a"), true);
+	}
+	super.tearDown();
+  }
+
+  @Test(timeout = 10000)
+  public void testMkdirs() throws IOException {
+    // No trailing slash
+    assertTrue(fs.mkdirs(path("/tests3a/a")));
+    assertTrue(fs.exists(path("/tests3a/a")));
+
+    // With trailing slash
+    assertTrue(fs.mkdirs(path("/tests3a/b/")));
+    assertTrue(fs.exists(path("/tests3a/b/")));
+
+    // Two levels deep
+    assertTrue(fs.mkdirs(path("/tests3a/c/a/")));
+    assertTrue(fs.exists(path("/tests3a/c/a/")));
+
+    // Mismatched slashes
+    assertTrue(fs.exists(path("/tests3a/c/a")));
+  }
+
+
+  @Test(timeout=20000)
+  public void testDelete() throws IOException {
+    // Test deleting an empty directory
+    assertTrue(fs.mkdirs(path("/tests3a/d")));
+    assertTrue(fs.delete(path("/tests3a/d"), true));
+    assertFalse(fs.exists(path("/tests3a/d")));
+
+    // Test deleting a deep empty directory
+    assertTrue(fs.mkdirs(path("/tests3a/e/f/g/h")));
+    assertTrue(fs.delete(path("/tests3a/e/f/g"), true));
+    assertFalse(fs.exists(path("/tests3a/e/f/g/h")));
+    assertFalse(fs.exists(path("/tests3a/e/f/g")));
+    assertTrue(fs.exists(path("/tests3a/e/f")));
+
+    // Test delete of just a file
+    writeFile(path("/tests3a/f/f/file"), 1000);
+    assertTrue(fs.exists(path("/tests3a/f/f/file")));
+    assertTrue(fs.delete(path("/tests3a/f/f/file"), false));
+    assertFalse(fs.exists(path("/tests3a/f/f/file")));
+
+
+    // Test delete of a path with files in various directories
+    writeFile(path("/tests3a/g/h/i/file"), 1000);
+    assertTrue(fs.exists(path("/tests3a/g/h/i/file")));
+    writeFile(path("/tests3a/g/h/j/file"), 1000);
+    assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
+    try {
+      assertFalse(fs.delete(path("/tests3a/g/h"), false));
+      fail("Expected delete to fail with recursion turned off");
+    } catch (IOException e) {}
+    assertTrue(fs.exists(path("/tests3a/g/h/j/file")));
+    assertTrue(fs.delete(path("/tests3a/g/h"), true));
+    assertFalse(fs.exists(path("/tests3a/g/h/j")));
+  }
+
+
+  @Test(timeout = 3600000)
+  public void testOpenCreate() throws IOException {
+    try {
+      createAndReadFileTest(1024);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+
+    try {
+      createAndReadFileTest(5 * 1024 * 1024);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+
+    try {
+      createAndReadFileTest(20 * 1024 * 1024);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+
+    /*
+    Enable to test the multipart upload
+    try {
+      createAndReadFileTest((long)6 * 1024 * 1024 * 1024);
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+    */
+  }
+
+  @Test(timeout = 1200000)
+  public void testRenameFile() throws IOException {
+    Path srcPath = path("/tests3a/a/srcfile");
+
+    final OutputStream outputStream = fs.create(srcPath, false);
+    generateTestData(outputStream, 11 * 1024 * 1024);
+    outputStream.close();
+
+    assertTrue(fs.exists(srcPath));
+
+    Path dstPath = path("/tests3a/b/dstfile");
+
+    assertFalse(fs.rename(srcPath, dstPath));
+    assertTrue(fs.mkdirs(dstPath.getParent()));
+    assertTrue(fs.rename(srcPath, dstPath));
+    assertTrue(fs.exists(dstPath));
+    assertFalse(fs.exists(srcPath));
+    assertTrue(fs.exists(srcPath.getParent()));
+  }
+
+
+  @Test(timeout = 10000)
+  public void testRenameDirectory() throws IOException {
+    Path srcPath = path("/tests3a/a");
+
+    assertTrue(fs.mkdirs(srcPath));
+    writeFile(new Path(srcPath, "b/testfile"), 1024);
+
+    Path nonEmptyPath = path("/tests3a/nonempty");
+    writeFile(new Path(nonEmptyPath, "b/testfile"), 1024);
+
+    assertFalse(fs.rename(srcPath, nonEmptyPath));
+
+    Path dstPath = path("/tests3a/b");
+    assertTrue(fs.rename(srcPath, dstPath));
+    assertFalse(fs.exists(srcPath));
+    assertTrue(fs.exists(new Path(dstPath, "b/testfile")));
+  }
+
+
+  @Test(timeout=10000)
+  public void testSeek() throws IOException {
+    Path path = path("/tests3a/testfile.seek");
+    writeFile(path, TEST_BUFFER_SIZE * 10);
+
+
+    FSDataInputStream inputStream = fs.open(path, TEST_BUFFER_SIZE);
+    inputStream.seek(inputStream.getPos() + MODULUS);
+
+    testReceivedData(inputStream, TEST_BUFFER_SIZE * 10 - MODULUS);
+  }
+
+  /**
+   * Creates and reads a file with the given size in S3. The test file is 
+   * generated according to a specific pattern.
+   * During the read phase the incoming data stream is also checked against this pattern.
+   *
+   * @param fileSize
+   * the size of the file to be generated in bytes
+   * @throws IOException
+   * thrown if an I/O error occurs while writing or reading the test file
+   */
+  private void createAndReadFileTest(final long fileSize) throws IOException {
+    final String objectName = UUID.randomUUID().toString();
+    final Path objectPath = new Path("/tests3a/", objectName);
+
+    // Write test file to S3
+    final OutputStream outputStream = fs.create(objectPath, false);
+    generateTestData(outputStream, fileSize);
+    outputStream.close();
+
+    // Now read the same file back from S3
+    final InputStream inputStream = fs.open(objectPath);
+    testReceivedData(inputStream, fileSize);
+    inputStream.close();
+
+    // Delete test file
+    fs.delete(objectPath, false);
+  }
+
+
+  /**
+   * Receives test data from the given input stream and checks the size of the 
+   * data as well as the pattern inside the received data.
+   *
+   * @param inputStream
+   * the input stream to read the test data from
+   * @param expectedSize
+   * the expected size of the data to be read from the input stream in bytes
+   * @throws IOException
+   * thrown if an error occurs while reading the data
+   */
+  private void testReceivedData(final InputStream inputStream, 
+    final long expectedSize) throws IOException {
+    final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
+
+    long totalBytesRead = 0;
+    int nextExpectedNumber = 0;
+    while (true) {
+      final int bytesRead = inputStream.read(testBuffer);
+      if (bytesRead < 0) {
+        break;
+      }
+
+      totalBytesRead += bytesRead;
+
+      for (int i = 0; i < bytesRead; ++i) {
+        if (testBuffer[i] != nextExpectedNumber) {
+          throw new IOException("Read number " + testBuffer[i] + " but expected "
+                                + nextExpectedNumber);
+        }
+
+        ++nextExpectedNumber;
+
+        if (nextExpectedNumber == MODULUS) {
+          nextExpectedNumber = 0;
+        }
+      }
+    }
+
+    if (totalBytesRead != expectedSize) {
+      throw new IOException("Expected to read " + expectedSize + 
+                            " bytes but only received " + totalBytesRead);
+    }
+  }
+
+
+  /**
+   * Generates test data of the given size according to some specific pattern
+   * and writes it to the provided output stream.
+   *
+   * @param outputStream
+   * the output stream to write the data to
+   * @param size
+   * the size of the test data to be generated in bytes
+   * @throws IOException
+   * thrown if an error occurs while writing the data
+   */
+  private void generateTestData(final OutputStream outputStream, 
+    final long size) throws IOException {
+
+    final byte[] testBuffer = new byte[TEST_BUFFER_SIZE];
+    for (int i = 0; i < testBuffer.length; ++i) {
+      testBuffer[i] = (byte) (i % MODULUS);
+    }
+
+    long bytesWritten = 0;
+    while (bytesWritten < size) {
+
+      final long diff = size - bytesWritten;
+      if (diff < testBuffer.length) {
+        outputStream.write(testBuffer, 0, (int)diff);
+        bytesWritten += diff;
+      } else {
+        outputStream.write(testBuffer);
+        bytesWritten += testBuffer.length;
+      }
+    }
+  }
+
+  private void writeFile(Path name, int fileSize) throws IOException {
+    final OutputStream outputStream = fs.create(name, false);
+    generateTestData(outputStream, fileSize);
+    outputStream.close();
+  }
+}

+ 105 - 0
hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml

@@ -0,0 +1,105 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+  <!--
+  S3A is a blobstore, with very different behavior than a
+  classic filesystem.
+  -->
+
+  <property>
+    <name>fs.contract.test.root-tests-enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.test.random-seek-count</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <name>fs.contract.is-blobstore</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.is-case-sensitive</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-returns-false-if-source-missing</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-append</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-directory-delete</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-rename</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-block-locality</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-concat</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek-on-closed-file</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rejects-seek-past-eof</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-strict-exceptions</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-unix-permissions</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rename-overwrites-dest</name>
+    <value>true</value>
+  </property>
+
+</configuration>

+ 6 - 1
hadoop-tools/hadoop-aws/src/test/resources/contract/s3n.xml

@@ -77,6 +77,11 @@
     <value>true</value>
   </property>
 
+  <property>
+    <name>fs.contract.supports-seek-on-closed-file</name>
+    <value>true</value>
+  </property>
+
   <property>
     <name>fs.contract.rejects-seek-past-eof</name>
     <value>true</value>
@@ -92,4 +97,4 @@
     <value>false</value>
   </property>
 
-</configuration>
+</configuration>