Browse Source

HADOOP-14423. S3Guard will set file length to -1 on a putObjectDirect(stream, -1) call. Contributed by Steve Loughran

Mingliang Liu 8 years ago
parent
commit
886d680e1d

+ 34 - 10
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -1161,8 +1161,9 @@ public class S3AFileSystem extends FileSystem {
    * @param inputStream source data.
    * @return the request
    */
-  private PutObjectRequest newPutObjectRequest(String key,
-      ObjectMetadata metadata, InputStream inputStream) {
+  PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata,
+      InputStream inputStream) {
     Preconditions.checkNotNull(inputStream);
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         inputStream, metadata);
@@ -1240,14 +1241,10 @@ public class S3AFileSystem extends FileSystem {
    * @return the upload initiated
    * @throws AmazonClientException on problems
    */
-  public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
+  PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
       throws AmazonClientException {
-    long len;
-    if (putObjectRequest.getFile() != null) {
-      len = putObjectRequest.getFile().length();
-    } else {
-      len = putObjectRequest.getMetadata().getContentLength();
-    }
+    long len = getPutRequestLength(putObjectRequest);
+    LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
     incrementPutStartStatistics(len);
     try {
       PutObjectResult result = s3.putObject(putObjectRequest);
@@ -1259,6 +1256,23 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Get the length of the PUT, verifying that the length is known.
+   * @param putObjectRequest a request bound to a file or a stream.
+   * @return the request length
+   * @throws IllegalArgumentException if the length is negative
+   */
+  private long getPutRequestLength(PutObjectRequest putObjectRequest) {
+    long len;
+    if (putObjectRequest.getFile() != null) {
+      len = putObjectRequest.getFile().length();
+    } else {
+      len = putObjectRequest.getMetadata().getContentLength();
+    }
+    Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length");
+    return len;
+  }
+
   /**
    * Upload part of a multi-partition file.
    * Increments the write and put counters.
@@ -2257,13 +2271,23 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Perform post-write actions.
+   * This operation MUST be called after any PUT/multipart PUT completes
+   * successfully.
+   * This includes
+   * <ol>
+   *   <li>Calling {@link #deleteUnnecessaryFakeDirectories(Path)}</li>
+   *   <li>Updating any metadata store with details on the newly created
+   *   object.</li>
+   * </ol>
    * @param key key written to
    * @param length  total length of file written
    */
-  public void finishedWrite(String key, long length) {
+  @InterfaceAudience.Private
+  void finishedWrite(String key, long length) {
     LOG.debug("Finished write to {}, len {}", key, length);
     Path p = keyToQualifiedPath(key);
     deleteUnnecessaryFakeDirectories(p.getParent());
+    Preconditions.checkArgument(length >= 0, "content length is negative");
 
     // See note about failure semantics in s3guard.md doc.
     try {

+ 20 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java

@@ -22,8 +22,13 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
@@ -55,6 +60,21 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase {
     createNonRecursive(new Path(parent, "fail"));
   }
 
+  @Test
+  public void testPutObjectDirect() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    ObjectMetadata metadata = fs.newObjectMetadata(-1);
+    metadata.setContentLength(-1);
+    Path path = path("putDirect");
+    final PutObjectRequest put = new PutObjectRequest(fs.getBucket(),
+        path.toUri().getPath(),
+        new ByteArrayInputStream("PUT".getBytes()),
+        metadata);
+    LambdaTestUtils.intercept(IllegalStateException.class,
+        () -> fs.putObjectDirect(put));
+    assertPathDoesNotExist("put object was created", path);
+  }
+
   private FSDataOutputStream createNonRecursive(Path path) throws IOException {
     return getFileSystem().createNonRecursive(path, false, 4096,
         (short) 3, (short) 4096,