Browse Source

HADOOP-16040. ABFS: Bug fix for tolerateOobAppends configuration.

Contributed by Da Zhou.

(cherry picked from commit e8d19003695e3bc76bfa8e1187a238eec0220def)
Da Zhou 6 years ago
parent
commit
f7de630e85

+ 2 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -374,7 +374,8 @@ public class AzureBlobFileSystemStore {
     // Add statistics for InputStream
     return new AbfsInputStream(client, statistics,
             AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
-                abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag);
+                abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
+                abfsConfiguration.getTolerateOobAppends(), eTag);
   }
 
   public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws

+ 2 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -61,6 +61,7 @@ public class AbfsInputStream extends FSInputStream {
       final long contentLength,
       final int bufferSize,
       final int readAheadQueueDepth,
+      final boolean tolerateOobAppends,
       final String eTag) {
     this.client = client;
     this.statistics = statistics;
@@ -68,8 +69,8 @@ public class AbfsInputStream extends FSInputStream {
     this.contentLength = contentLength;
     this.bufferSize = bufferSize;
     this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
+    this.tolerateOobAppends = tolerateOobAppends;
     this.eTag = eTag;
-    this.tolerateOobAppends = false;
     this.readAheadEnabled = true;
   }
 

+ 35 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java

@@ -25,12 +25,14 @@ import java.util.Random;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -66,7 +68,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
   }
 
   @Test (expected = IOException.class)
-  public void testOOBWrites() throws Exception {
+  public void testOOBWritesAndReadFail() throws Exception {
+    Configuration conf = this.getRawConfiguration();
+    conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, false);
     final AzureBlobFileSystem fs = getFileSystem();
     int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
 
@@ -83,7 +87,6 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
     try (FSDataInputStream readStream = fs.open(testFilePath)) {
       assertEquals(readBufferSize,
           readStream.read(bytesToRead, 0, readBufferSize));
-
       try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
         writeStream.write(b);
         writeStream.flush();
@@ -94,6 +97,36 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
     }
   }
 
+  @Test
+  public void testOOBWritesAndReadSucceed() throws Exception {
+    Configuration conf = this.getRawConfiguration();
+    conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, true);
+    final AzureBlobFileSystem fs = getFileSystem(conf);
+    int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
+
+    byte[] bytesToRead = new byte[readBufferSize];
+    final byte[] b = new byte[2 * readBufferSize];
+    new Random().nextBytes(b);
+    final Path testFilePath = new Path(methodName.getMethodName());
+
+    try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
+      writeStream.write(b);
+      writeStream.flush();
+    }
+
+    try (FSDataInputStream readStream = fs.open(testFilePath)) {
+      // Read
+      assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
+      // Concurrent write
+      try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
+        writeStream.write(b);
+        writeStream.flush();
+      }
+
+      assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
+    }
+  }
+
   @Test
   public void testWriteWithBufferOffset() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();