Переглянути джерело

HADOOP-19531. [ABFS][FnsOverBlob] Streaming List Path Result Should Happen Inside Retry Loop (#7582)

Contributed by Anuj Modi
Reviewed by Anmol Asrani, Manish Bhatt, Manika Joshi

Signed off by: Anuj Modi<anujmodi@apache.org>
Anuj Modi 3 тижнів тому
батько
коміт
0dac3d2050

+ 21 - 7
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect10
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
 import org.apache.http.client.methods.HttpGet;
@@ -47,6 +48,7 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
@@ -192,14 +194,26 @@ public class AbfsAHCHttpOperation extends AbfsHttpOperation {
   public void processResponse(final byte[] buffer,
       final int offset,
       final int length) throws IOException {
-    if (!isPayloadRequest) {
-      prepareRequest();
-      LOG.debug("Sending request: {}", httpRequestBase);
-      httpResponse = executeRequest();
-      LOG.debug("Request sent: {}; response {}", httpRequestBase,
-          httpResponse);
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        LOG.debug("Sending request: {}", httpRequestBase);
+        httpResponse = executeRequest();
+        LOG.debug("Request sent: {}; response {}", httpRequestBase,
+            httpResponse);
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        try {
+          EntityUtils.consume(httpResponse.getEntity());
+        } finally {
+          if (httpResponse instanceof CloseableHttpResponse) {
+            ((CloseableHttpResponse) httpResponse).close();
+          }
+        }
+      }
     }
-    parseResponseHeaderAndBody(buffer, offset, length);
   }
 
   /**

+ 14 - 17
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -1605,12 +1605,9 @@ public class AbfsBlobClient extends AbfsClient {
   @Override
   public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
       throws AzureBlobFileSystemException {
-    BlobListResultSchema listResultSchema;
     try (InputStream stream = result.getListResultStream()) {
-      if (stream == null) {
-        return null;
-      }
       try {
+        BlobListResultSchema listResultSchema;
         final SAXParser saxParser = saxParserThreadLocal.get();
         saxParser.reset();
         listResultSchema = new BlobListResultSchema();
@@ -1620,19 +1617,17 @@ public class AbfsBlobClient extends AbfsClient {
         LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
             listResultSchema.paths().size(),
             listResultSchema.getNextMarker());
-      } catch (SAXException | IOException e) {
-        throw new AbfsDriverException(e);
+        return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
+      } catch (SAXException | IOException ex) {
+        throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
       }
-    } catch (IOException e) {
-      LOG.error("Unable to deserialize list results for uri {}", uri.toString(), e);
-      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
-    }
-
-    try {
-      return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
-    } catch (IOException e) {
-      LOG.error("Unable to filter list results for uri {}", uri.toString(), e);
-      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
+    } catch (AbfsDriverException ex) {
+      // Throw as it is to avoid multiple wrapping.
+      LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex);
+      throw ex;
+    } catch (Exception ex) {
+      LOG.error("Unable to get stream for list results for uri {}", uri != null ? uri.toString(): "NULL", ex);
+      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
     }
   }
 
@@ -1929,8 +1924,10 @@ public class AbfsBlobClient extends AbfsClient {
    * @param listResultSchema List of entries returned by Blob Endpoint.
    * @param uri URI to be used for path conversion.
    * @return List of entries after removing duplicates.
+   * @throws IOException if path conversion fails.
    */
-  private ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
+  @VisibleForTesting
+  public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
       BlobListResultSchema listResultSchema, URI uri) throws IOException {
     List<FileStatus> fileStatuses = new ArrayList<>();
     Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();

+ 25 - 20
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

@@ -38,8 +38,10 @@ import java.util.Map;
 import java.util.UUID;
 
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -1476,33 +1478,36 @@ public class AbfsDfsClient extends AbfsClient {
    * @throws AzureBlobFileSystemException if parsing fails.
    */
   @Override
-  public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) throws AzureBlobFileSystemException {
-    try (InputStream listResultInputStream = result.getListResultStream()) {
-      DfsListResultSchema listResultSchema;
+  public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
+      throws AzureBlobFileSystemException {
+    try (InputStream stream = result.getListResultStream()) {
       try {
+        DfsListResultSchema listResultSchema;
         final ObjectMapper objectMapper = new ObjectMapper();
-        listResultSchema = objectMapper.readValue(listResultInputStream,
-            DfsListResultSchema.class);
+        listResultSchema = objectMapper.readValue(stream, DfsListResultSchema.class);
         result.setListResultSchema(listResultSchema);
         LOG.debug("ListPath listed {} paths with {} as continuation token",
             listResultSchema.paths().size(),
             getContinuationFromResponse(result));
-      } catch (IOException ex) {
-        throw new AbfsDriverException(ex);
-      }
-
-      List<FileStatus> fileStatuses = new ArrayList<>();
-      for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
-        fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
+        List<FileStatus> fileStatuses = new ArrayList<>();
+        for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
+          fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
+        }
+        ListResponseData listResponseData = new ListResponseData();
+        listResponseData.setFileStatusList(fileStatuses);
+        listResponseData.setRenamePendingJsonPaths(null);
+        listResponseData.setContinuationToken(
+            getContinuationFromResponse(result));
+        return listResponseData;
+      } catch (JsonParseException | JsonMappingException ex) {
+        throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex);
       }
-      ListResponseData listResponseData = new ListResponseData();
-      listResponseData.setFileStatusList(fileStatuses);
-      listResponseData.setRenamePendingJsonPaths(null);
-      listResponseData.setContinuationToken(
-          getContinuationFromResponse(result));
-      return listResponseData;
-    } catch (IOException ex) {
-      LOG.error("Unable to deserialize list results for Uri {}", uri.toString(), ex);
+    } catch (AbfsDriverException ex) {
+      // Throw as it is to avoid multiple wrapping.
+      LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex);
+      throw ex;
+    } catch (Exception ex) {
+      LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex);
       throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex);
     }
   }

+ 29 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
@@ -221,7 +223,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable {
     return listResultSchema;
   }
 
-  public final InputStream getListResultStream() {
+  public InputStream getListResultStream() {
     return listResultStream;
   }
 
@@ -396,8 +398,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable {
       // consume the input stream to release resources
       int totalBytesRead = 0;
 
-      try {
-        InputStream stream = getContentInputStream();
+      try (InputStream stream = getContentInputStream()) {
         if (isNullInputStream(stream)) {
           return;
         }
@@ -409,7 +410,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable {
           if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) {
             parseBlockListResponse(stream);
           } else {
-            listResultStream = stream;
+            parseListPathResponse(stream);
           }
         } else {
           if (buffer != null) {
@@ -438,6 +439,11 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable {
             method, getMaskedUrl(), ex.getMessage());
         log.debug("IO Error: ", ex);
         throw ex;
+      } catch (Exception ex) {
+        log.warn("Unexpected error: {} {}: {}",
+            method, getMaskedUrl(), ex.getMessage());
+        log.debug("Unexpected Error: ", ex);
+        throw new IOException(ex);
       } finally {
         this.recvResponseTimeMs += elapsedTimeMs(startTime);
         this.bytesReceived = totalBytesRead;
@@ -500,6 +506,25 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable {
     blockIdList = client.parseBlockListResponse(stream);
   }
 
+  /**
+   * Parse the list path response from the network stream and save response into a buffer.
+   * @param stream Network InputStream.
+   * @throws IOException if an error occurs while reading the stream.
+   */
+  private void parseListPathResponse(final InputStream stream) throws IOException {
+    if (stream == null || listResultStream != null) {
+      return;
+    }
+    try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
+      byte[] tempBuffer = new byte[CLEAN_UP_BUFFER_SIZE];
+      int bytesRead;
+      while ((bytesRead = stream.read(tempBuffer, 0, CLEAN_UP_BUFFER_SIZE)) != -1) {
+        buffer.write(tempBuffer, 0, bytesRead);
+      }
+      listResultStream = new ByteArrayInputStream(buffer.toByteArray());
+    }
+  }
+
   public List<String> getBlockIdList() {
     return blockIdList;
   }

+ 37 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -42,6 +43,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
@@ -63,7 +65,10 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PAT
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING;
 import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
+import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
 import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
@@ -130,7 +135,9 @@ public class ITestAzureBlobFileSystemListStatus extends
 
   /**
    * Test to verify that each paginated call to ListBlobs uses a new tracing context.
-   * @throws Exception
+   * Test also verifies that the retry policy is called when a SocketTimeoutException
+   * Test also verifies that empty list with valid continuation token is handled.
+   * @throws Exception if there is an error or test assertions fails.
    */
   @Test
   public void testListPathTracingContext() throws Exception {
@@ -160,6 +167,10 @@ public class ITestAzureBlobFileSystemListStatus extends
     List<FileStatus> fileStatuses = new ArrayList<>();
     spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext);
 
+    // Assert that there were retries due to SocketTimeoutException
+    Mockito.verify(spiedClient, Mockito.times(1))
+        .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
+
     // Assert that there were 2 paginated ListPath calls were made 1 and 2.
     // 1. Without continuation token
     Mockito.verify(spiedClient, times(1)).listPath(
@@ -176,6 +187,31 @@ public class ITestAzureBlobFileSystemListStatus extends
     Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any());
   }
 
+  @Test
+  public void testListPathParsingFailure() throws Exception {
+    assumeBlobServiceType();
+    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+    AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler()
+        .getBlobClient());
+    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+
+    Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any());
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    AbfsDriverException ex = intercept(AbfsDriverException.class,
+      () -> {
+        spiedStore.listStatus(new Path("/"), "", fileStatuses,
+            true, null, getTestTracingContext(spiedFs, true));
+      });
+    Assertions.assertThat(ex.getStatusCode())
+        .describedAs("Expecting Network Error status code")
+        .isEqualTo(-1);
+    Assertions.assertThat(ex.getErrorMessage())
+        .describedAs("Expecting COPY_ABORTED error code")
+        .contains(ERR_BLOB_LIST_PARSING);
+  }
+
   /**
    * Creates a file, verifies that listStatus returns it,
    * even while the file is still open for writing.

+ 2 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java

@@ -45,6 +45,7 @@ import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
@@ -67,6 +68,7 @@ public class ITestApacheClientConnectionPool extends
   public void testKacIsClosed() throws Throwable {
     Configuration configuration = new Configuration(getRawConfiguration());
     configuration.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.name());
+    configuration.unset(FS_AZURE_METRIC_FORMAT);
     try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
         configuration)) {
       KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient()

+ 1 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java

@@ -57,6 +57,7 @@ public class TestAbfsClient {
     public void testTimerInitializationWithoutMetricCollection() throws Exception {
         final Configuration configuration = new Configuration();
         AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME);
+        abfsConfiguration.unset(FS_AZURE_METRIC_FORMAT);
 
         AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
         AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();