ソースを参照

HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. (#6105)

AbfsOutputStream to close the dataBlock object created for the upload.

Contributed By: Pranav Saxena
Pranav Saxena 1 年間 前
コミット
a3e132e20c

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java

@@ -329,7 +329,7 @@ public final class DataBlocks {
    */
   public static abstract class DataBlock implements Closeable {
 
-    enum DestState {Writing, Upload, Closed}
+    public enum DestState {Writing, Upload, Closed}
 
     private volatile DestState state = Writing;
     private final long index;
@@ -375,7 +375,7 @@ public final class DataBlocks {
      *
      * @return the current state.
      */
-    final DestState getState() {
+    public final DestState getState() {
       return state;
     }
 

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -330,7 +330,7 @@ public class AzureBlobFileSystem extends FileSystem
     try {
       TracingContext tracingContext = new TracingContext(clientCorrelationId,
           fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener);
-      OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
+      OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite,
           permission == null ? FsPermission.getFileDefault() : permission,
           FsPermission.getUMask(getConf()), tracingContext);
       statIncrement(FILES_CREATED);

+ 6 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -707,7 +707,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
             .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
             .withLease(lease)
-            .withBlockFactory(blockFactory)
+            .withBlockFactory(getBlockFactory())
             .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
             .withClient(client)
             .withPosition(position)
@@ -1940,6 +1940,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     this.client = client;
   }
 
+  @VisibleForTesting
+  DataBlocks.BlockFactory getBlockFactory() {
+    return blockFactory;
+  }
+
   @VisibleForTesting
   void setNamespaceEnabled(Trilean isNamespaceEnabled){
     this.isNamespaceEnabled = isNamespaceEnabled;

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

@@ -344,7 +344,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
             outputStreamStatistics.uploadSuccessful(bytesLength);
             return null;
           } finally {
-            IOUtils.close(blockUploadData);
+            IOUtils.close(blockUploadData, blockToUpload);
           }
         });
     writeOperations.add(new WriteOperation(job, offset, bytesLength));

+ 59 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java

@@ -20,15 +20,31 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.store.BlockUploadStatistics;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
 
 /**
  * Test append operations.
@@ -90,4 +106,47 @@ public class ITestAzureBlobFileSystemAppend extends
         fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
     fs.append(testPath, 10);
   }
+
+  @Test
+  public void testCloseOfDataBlockOnAppendComplete() throws Exception {
+    Set<String> blockBufferTypes = new HashSet<>();
+    blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
+    blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
+    blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
+    for (String blockBufferType : blockBufferTypes) {
+      Configuration configuration = new Configuration(getRawConfiguration());
+      configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
+      AzureBlobFileSystem fs = Mockito.spy(
+          (AzureBlobFileSystem) FileSystem.newInstance(configuration));
+      AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+      Mockito.doReturn(store).when(fs).getAbfsStore();
+      DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
+      Mockito.doAnswer(getBlobFactoryInvocation -> {
+        DataBlocks.BlockFactory factory = Mockito.spy(
+            (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
+        Mockito.doAnswer(factoryCreateInvocation -> {
+              dataBlock[0] = Mockito.spy(
+                  (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
+              return dataBlock[0];
+            })
+            .when(factory)
+            .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
+                BlockUploadStatistics.class));
+        return factory;
+      }).when(store).getBlockFactory();
+      try (OutputStream os = fs.create(
+          new Path(getMethodName() + "_" + blockBufferType))) {
+        os.write(new byte[1]);
+        Assertions.assertThat(dataBlock[0].getState())
+            .describedAs(
+                "On write of data in outputStream, state should become Writing")
+            .isEqualTo(Writing);
+        os.close();
+        Mockito.verify(dataBlock[0], Mockito.times(1)).close();
+        Assertions.assertThat(dataBlock[0].getState())
+            .describedAs("On close of outputStream, state should become Closed")
+            .isEqualTo(Closed);
+      }
+    }
+  }
 }