Przeglądaj źródła

HADOOP-16306. AliyunOSS: Remove temporary files when upload small files to OSS. Contributed by wujinhu.

(cherry picked from commit 2d8282bb8248e6984878626c4cdc7148aa2e7202)
Weiwei Yang 6 lat temu
rodzic
commit
afe2b98daf

+ 9 - 1
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java

@@ -124,7 +124,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
             new ArrayList<>(partETags));
       }
     } finally {
-      removePartFiles();
+      removeTemporaryFiles();
       closed = true;
     }
   }
@@ -149,6 +149,14 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
     }
   }
 
+  private void removeTemporaryFiles() {
+    for (File file : blockFiles.values()) {
+      if (file != null && file.exists() && !file.delete()) {
+        LOG.warn("Failed to delete temporary file {}", file);
+      }
+    }
+  }
+
   private void removePartFiles() throws IOException {
     for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
       if (!partETagFuture.isDone()) {

+ 30 - 2
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.aliyun.oss;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -30,7 +31,9 @@ import org.junit.rules.Timeout;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
 import static org.junit.Assert.assertEquals;
 
@@ -49,9 +52,9 @@ public class TestAliyunOSSBlockOutputStream {
   @Before
   public void setUp() throws Exception {
     Configuration conf = new Configuration();
-    conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
+    conf.setInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
     conf.setInt(IO_CHUNK_BUFFER_SIZE,
-        conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
+        conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
     conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
     fs = AliyunOSSTestUtils.createTestFileSystem(conf);
   }
@@ -70,6 +73,7 @@ public class TestAliyunOSSBlockOutputStream {
   @Test
   public void testZeroByteUpload() throws IOException {
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
+    bufferDirShouldEmpty();
   }
 
   @Test
@@ -106,6 +110,7 @@ public class TestAliyunOSSBlockOutputStream {
     assertEquals(3 * size, statistics.getBytesRead());
     assertEquals(10, statistics.getWriteOps());
     assertEquals(3 * size, statistics.getBytesWritten());
+    bufferDirShouldEmpty();
   }
 
   @Test
@@ -131,6 +136,7 @@ public class TestAliyunOSSBlockOutputStream {
     assertEquals(3 * size, statistics.getBytesRead());
     assertEquals(25, statistics.getWriteOps());
     assertEquals(3 * size, statistics.getBytesWritten());
+    bufferDirShouldEmpty();
   }
 
   @Test
@@ -144,6 +150,7 @@ public class TestAliyunOSSBlockOutputStream {
     assertEquals(size, statistics.getBytesRead());
     assertEquals(52, statistics.getWriteOps());
     assertEquals(size, statistics.getBytesWritten());
+    bufferDirShouldEmpty();
   }
 
   @Test
@@ -154,6 +161,7 @@ public class TestAliyunOSSBlockOutputStream {
         MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
         MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
+    bufferDirShouldEmpty();
   }
 
   @Test
@@ -174,4 +182,24 @@ public class TestAliyunOSSBlockOutputStream {
     assert(10001 * 100 * 1024 / partSize4
         < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
   }
+
+  @Test
+  /**
+   * This test is used to verify HADOOP-16306.
+   * Test small file uploading so that oss fs will upload file directly
+   * instead of multi part upload.
+   */
+  public void testSmallUpload() throws IOException {
+    long size = fs.getConf().getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
+    bufferDirShouldEmpty();
+  }
+
+  private void bufferDirShouldEmpty() throws IOException {
+    Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY));
+    FileStatus[] files = bufferPath.getFileSystem(
+        fs.getConf()).listStatus(bufferPath);
+    // Temporary file should be deleted
+    assertEquals(0, files.length);
+  }
 }