Browse Source

HADOOP-17301. ABFS: read-ahead error reporting breaks buffer management (#2369)

Fixes read-ahead buffer management issues introduced by HADOOP-16852,
 "ABFS: Send error back to client for Read Ahead request failure".

Contributed by Sneha Vijayarajan
Sneha Vijayarajan 4 years ago
parent
commit
c4fff74cc5

+ 33 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -218,6 +219,8 @@ final class ReadBufferManager {
       return false;  // there are no evict-able buffers
     }
 
+    long currentTimeInMs = currentTimeMillis();
+
     // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
     for (ReadBuffer buf : completedReadList) {
       if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
@@ -242,14 +245,30 @@ final class ReadBufferManager {
     }
 
     // next, try any old nodes that have not been consumed
+    // Failed read buffers (with buffer index=-1) that are older than
+    // thresholdAge should be cleaned up, but at the same time should not
+    // report successful eviction.
+    // Queue logic expects that a buffer is freed up for read ahead when
+    // eviction is successful, whereas a failed ReadBuffer would have released
+    // its buffer when its status was set to READ_FAILED.
     long earliestBirthday = Long.MAX_VALUE;
+    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
     for (ReadBuffer buf : completedReadList) {
-      if (buf.getTimeStamp() < earliestBirthday) {
+      if ((buf.getBufferindex() != -1)
+          && (buf.getTimeStamp() < earliestBirthday)) {
         nodeToEvict = buf;
         earliestBirthday = buf.getTimeStamp();
+      } else if ((buf.getBufferindex() == -1)
+          && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) {
+        oldFailedBuffers.add(buf);
       }
     }
-    if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
+
+    for (ReadBuffer buf : oldFailedBuffers) {
+      evict(buf);
+    }
+
+    if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
       return evict(nodeToEvict);
     }
 
@@ -417,7 +436,6 @@ final class ReadBufferManager {
       if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
         buffer.setStatus(ReadBufferStatus.AVAILABLE);
         buffer.setLength(bytesActuallyRead);
-        completedReadList.add(buffer);
       } else {
         freeList.push(buffer.getBufferindex());
         // buffer will be deleted as per the eviction policy.
@@ -464,4 +482,16 @@ final class ReadBufferManager {
   void callTryEvict() {
     tryEvict();
   }
+
+  /**
+   * Test method that can mimic no free buffers scenario and also add a ReadBuffer
+   * into completedReadList. This readBuffer will get picked up by TryEvict()
+   * next time a new queue request comes in.
+   * @param buf that needs to be added to completedReadlist
+   */
+  @VisibleForTesting
+  void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
+    freeList.clear();
+    completedReadList.add(buf);
+  }
 }

+ 49 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

@@ -23,9 +23,12 @@ import java.io.IOException;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.assertj.core.api.Assertions;
+
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -49,6 +52,8 @@ public class TestAbfsInputStream extends
   private static final int TWO_KB = 2 * 1024;
   private static final int THREE_KB = 3 * 1024;
   private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
+  private static final int INCREASED_READ_BUFFER_AGE_THRESHOLD =
+      REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
 
   private AbfsRestOperation getMockRestOp() {
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -182,7 +187,38 @@ public class TestAbfsInputStream extends
     checkEvictedStatus(inputStream, 0, false);
   }
 
+  @Test
+  public void testFailedReadAheadEviction() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+    ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
+    // Stub :
+    // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+    // Actual read request fails with the failure in readahead thread
+    doThrow(new TimeoutException("Internal Server error"))
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAheadEviction.txt");
+
+    // Add a failed buffer to completed queue and set to no free buffers to read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
+
+    // if read failed buffer eviction is tagged as a valid eviction, it will lead to
+    // wrong assumption of queue logic that a buffer is freed up and can lead to :
+    // java.util.EmptyStackException
+    // at java.util.Stack.peek(Stack.java:102)
+    // at java.util.Stack.pop(Stack.java:84)
+    // at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.queueReadAhead
+    ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB);
+  }
+
   /**
+   *
    * The test expects AbfsInputStream to initiate a remote read request for
    * the request offset and length when previous read ahead on the offset had failed.
    * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
@@ -264,12 +300,25 @@ public class TestAbfsInputStream extends
             any(String.class));
 
     AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
+    int beforeReadCompletedListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize();
 
     // First read request that triggers readAheads.
     inputStream.read(new byte[ONE_KB]);
 
     // Only the 3 readAhead threads should have triggered client.read
     verifyReadCallCount(client, 3);
+    int newAdditionsToCompletedRead =
+        ReadBufferManager.getBufferManager().getCompletedReadListSize()
+            - beforeReadCompletedListSize;
+    // read buffer might be dumped if the ReadBufferManager getblock preceded
+    // the action of buffer being picked for reading from readaheadqueue, so that
+    // inputstream can proceed with read and not be blocked on readahead thread
+    // availability. So the count of buffers in completedReadQueue for the stream
+    // can be same or lesser than the requests triggered to queue readahead.
+    Assertions.assertThat(newAdditionsToCompletedRead)
+        .describedAs(
+            "New additions to completed reads should be same or less than as number of readaheads")
+        .isLessThanOrEqualTo(3);
 
     // Another read request whose requested data is already read ahead.
     inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);