|
@@ -18,22 +18,35 @@
|
|
|
|
|
|
package org.apache.hadoop.fs.azurebfs.services;
|
|
package org.apache.hadoop.fs.azurebfs.services;
|
|
|
|
|
|
|
|
+import javax.xml.parsers.DocumentBuilderFactory;
|
|
|
|
+import javax.xml.parsers.ParserConfigurationException;
|
|
|
|
+import javax.xml.parsers.SAXParser;
|
|
|
|
+import javax.xml.parsers.SAXParserFactory;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.InputStream;
|
|
import java.io.UnsupportedEncodingException;
|
|
import java.io.UnsupportedEncodingException;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
-import java.net.URLDecoder;
|
|
|
|
import java.net.URLEncoder;
|
|
import java.net.URLEncoder;
|
|
import java.nio.charset.CharacterCodingException;
|
|
import java.nio.charset.CharacterCodingException;
|
|
import java.nio.charset.Charset;
|
|
import java.nio.charset.Charset;
|
|
import java.nio.charset.CharsetEncoder;
|
|
import java.nio.charset.CharsetEncoder;
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Hashtable;
|
|
import java.util.Hashtable;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.TreeMap;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
|
|
|
|
|
|
+import org.w3c.dom.Document;
|
|
|
|
+import org.w3c.dom.Node;
|
|
|
|
+import org.w3c.dom.NodeList;
|
|
|
|
+import org.xml.sax.SAXException;
|
|
|
|
+
|
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.lang3.NotImplementedException;
|
|
import org.apache.commons.lang3.NotImplementedException;
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
|
|
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
|
|
@@ -45,6 +58,11 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListXmlParser;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
|
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
|
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
|
@@ -52,6 +70,7 @@ import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
|
|
|
|
|
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
|
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
|
|
|
+import static java.net.HttpURLConnection.HTTP_OK;
|
|
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
|
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON;
|
|
@@ -65,18 +84,29 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONTAINER;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CONTAINER;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LEASE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LEASE;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LIST;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.METADATA;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.METADATA;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_CODE_END_XML;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_CODE_START_XML;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_MESSAGE_END_XML;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOB_ERROR_MESSAGE_START_XML;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOCK_NAME;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_COMMITTED_BLOCKS;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_ISFOLDER;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_NAME;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_UNICODE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT;
|
|
@@ -86,6 +116,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.C
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.LAST_MODIFIED;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
|
|
@@ -103,6 +134,11 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARA
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKLISTTYPE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOCKLISTTYPE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_DELIMITER;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_INCLUDE;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MARKER;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAX_RESULTS;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PREFIX;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -271,14 +307,56 @@ public class AbfsBlobClient extends AbfsClient {
|
|
* @return executed rest operation containing response from server.
|
|
* @return executed rest operation containing response from server.
|
|
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
|
|
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
|
|
*/
|
|
*/
|
|
- @Override
|
|
|
|
- public AbfsRestOperation listPath(final String relativePath,
|
|
|
|
- final boolean recursive,
|
|
|
|
- final int listMaxResults,
|
|
|
|
- final String continuation,
|
|
|
|
- TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
|
|
- // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs.
|
|
|
|
- throw new NotImplementedException("Blob Endpoint Support is not yet implemented");
|
|
|
|
|
|
+ public AbfsRestOperation listPath(final String relativePath, final boolean recursive,
|
|
|
|
+ final int listMaxResults, final String continuation, TracingContext tracingContext)
|
|
|
|
+ throws IOException {
|
|
|
|
+ return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public AbfsRestOperation listPath(final String relativePath, final boolean recursive,
|
|
|
|
+ final int listMaxResults, final String continuation, TracingContext tracingContext,
|
|
|
|
+ boolean is404CheckRequired) throws AzureBlobFileSystemException {
|
|
|
|
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
|
|
|
+
|
|
|
|
+ AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
|
|
|
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESTYPE, CONTAINER);
|
|
|
|
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LIST);
|
|
|
|
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_INCLUDE, METADATA);
|
|
|
|
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_PREFIX, getDirectoryQueryParameter(relativePath));
|
|
|
|
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_MARKER, continuation);
|
|
|
|
+ if (!recursive) {
|
|
|
|
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
|
|
|
|
+ }
|
|
|
|
+ abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, String.valueOf(listMaxResults));
|
|
|
|
+ appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder);
|
|
|
|
+
|
|
|
|
+ final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
|
|
|
|
+ final AbfsRestOperation op = getAbfsRestOperation(
|
|
|
|
+ AbfsRestOperationType.ListBlobs,
|
|
|
|
+ HTTP_METHOD_GET,
|
|
|
|
+ url,
|
|
|
|
+ requestHeaders);
|
|
|
|
+
|
|
|
|
+ op.execute(tracingContext);
|
|
|
|
+ if (isEmptyListResults(op.getResult()) && is404CheckRequired) {
|
|
|
|
+ // If the list operation returns no paths, we need to check if the path is a file.
|
|
|
|
+ // If it is a file, we need to return the file in the list.
|
|
|
|
+ // If it is a non-existing path, we need to throw a FileNotFoundException.
|
|
|
|
+ if (relativePath.equals(ROOT_PATH)) {
|
|
|
|
+ // Root Always exists as directory. It can be a empty listing.
|
|
|
|
+ return op;
|
|
|
|
+ }
|
|
|
|
+ AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
|
|
|
|
+ BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
|
|
|
|
+ AbfsRestOperation listOp = getAbfsRestOperation(
|
|
|
|
+ AbfsRestOperationType.ListBlobs,
|
|
|
|
+ HTTP_METHOD_GET,
|
|
|
|
+ url,
|
|
|
|
+ requestHeaders);
|
|
|
|
+ listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
|
|
|
|
+ return listOp;
|
|
|
|
+ }
|
|
|
|
+ return op;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -299,7 +377,7 @@ public class AbfsBlobClient extends AbfsClient {
|
|
final String eTag,
|
|
final String eTag,
|
|
final ContextEncryptionAdapter contextEncryptionAdapter,
|
|
final ContextEncryptionAdapter contextEncryptionAdapter,
|
|
final TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
final TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
- // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of response handling of blob endpoint APIs.
|
|
|
|
|
|
+ // TODO: [FnsOverBlob][HADOOP-19232] To be implemented as part of ingress support.
|
|
throw new NotImplementedException("Create Path operation on Blob endpoint yet to be implemented.");
|
|
throw new NotImplementedException("Create Path operation on Blob endpoint yet to be implemented.");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -668,6 +746,26 @@ public class AbfsBlobClient extends AbfsClient {
|
|
AbfsRestOperationType.SetPathProperties,
|
|
AbfsRestOperationType.SetPathProperties,
|
|
HTTP_METHOD_PUT, url, requestHeaders);
|
|
HTTP_METHOD_PUT, url, requestHeaders);
|
|
op.execute(tracingContext);
|
|
op.execute(tracingContext);
|
|
|
|
+ try {
|
|
|
|
+ op.execute(tracingContext);
|
|
|
|
+ } catch (AbfsRestOperationException ex) {
|
|
|
|
+ // If we have no HTTP response, throw the original exception.
|
|
|
|
+ if (!op.hasResult()) {
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
|
|
+ // This path could be present as an implicit directory in FNS.
|
|
|
|
+ if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isNonEmptyListing(path, tracingContext)) {
|
|
|
|
+ // Implicit path found, create a marker blob at this path and set properties.
|
|
|
|
+ this.createPath(path, false, false, null, false, null, contextEncryptionAdapter, tracingContext);
|
|
|
|
+ // Make sure hdi_isFolder is added to the list of properties to be set.
|
|
|
|
+ boolean hdiIsFolderExists = properties.containsKey(XML_TAG_HDI_ISFOLDER);
|
|
|
|
+ if (!hdiIsFolderExists) {
|
|
|
|
+ properties.put(XML_TAG_HDI_ISFOLDER, TRUE);
|
|
|
|
+ }
|
|
|
|
+ return this.setPathProperties(path, properties, tracingContext, contextEncryptionAdapter);
|
|
|
|
+ }
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
return op;
|
|
return op;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -719,7 +817,7 @@ public class AbfsBlobClient extends AbfsClient {
|
|
|
|
|
|
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
|
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
|
final AbfsRestOperation op = getAbfsRestOperation(
|
|
final AbfsRestOperation op = getAbfsRestOperation(
|
|
- AbfsRestOperationType.GetPathStatus,
|
|
|
|
|
|
+ AbfsRestOperationType.GetBlobProperties,
|
|
HTTP_METHOD_HEAD, url, requestHeaders);
|
|
HTTP_METHOD_HEAD, url, requestHeaders);
|
|
try {
|
|
try {
|
|
op.execute(tracingContext);
|
|
op.execute(tracingContext);
|
|
@@ -728,9 +826,15 @@ public class AbfsBlobClient extends AbfsClient {
|
|
if (!op.hasResult()) {
|
|
if (!op.hasResult()) {
|
|
throw ex;
|
|
throw ex;
|
|
}
|
|
}
|
|
- if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isImplicitCheckRequired) {
|
|
|
|
- // This path could be present as an implicit directory in FNS.
|
|
|
|
- // TODO: [FnsOverBlob][HADOOP-19207] To be implemented as part of implicit directory handling over blob endpoint.
|
|
|
|
|
|
+ // This path could be present as an implicit directory in FNS.
|
|
|
|
+ if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
|
|
|
|
+ && isImplicitCheckRequired && isNonEmptyListing(path, tracingContext)) {
|
|
|
|
+ // Implicit path found.
|
|
|
|
+ AbfsRestOperation successOp = getAbfsRestOperation(
|
|
|
|
+ AbfsRestOperationType.GetPathStatus,
|
|
|
|
+ HTTP_METHOD_HEAD, url, requestHeaders);
|
|
|
|
+ successOp.hardSetGetFileStatusResult(HTTP_OK);
|
|
|
|
+ return successOp;
|
|
}
|
|
}
|
|
if (op.getResult().getStatusCode() == HTTP_NOT_FOUND) {
|
|
if (op.getResult().getStatusCode() == HTTP_NOT_FOUND) {
|
|
/*
|
|
/*
|
|
@@ -913,31 +1017,6 @@ public class AbfsBlobClient extends AbfsClient {
|
|
"CheckAccess operation is only supported on HNS enabled Accounts.");
|
|
"CheckAccess operation is only supported on HNS enabled Accounts.");
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Checks if the rest operation results indicate if the path is a directory.
|
|
|
|
- * @param result executed rest operation containing response from server.
|
|
|
|
- * @return True if the path is a directory, False otherwise.
|
|
|
|
- */
|
|
|
|
- @Override
|
|
|
|
- public boolean checkIsDir(AbfsHttpOperation result) {
|
|
|
|
- String resourceType = result.getResponseHeader(X_MS_META_HDI_ISFOLDER);
|
|
|
|
- return resourceType != null && resourceType.equals(TRUE);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Returns true if the status code lies in the range of user error.
|
|
|
|
- * In the case of HTTP_CONFLICT for PutBlockList we fall back to DFS and hence
|
|
|
|
- * this retry handling is not needed.
|
|
|
|
- * @param responseStatusCode http response status code.
|
|
|
|
- * @return True or False.
|
|
|
|
- */
|
|
|
|
- @Override
|
|
|
|
- public boolean checkUserError(int responseStatusCode) {
|
|
|
|
- return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
|
|
|
|
- && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
|
|
|
|
- && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Get Rest Operation for API
|
|
* Get Rest Operation for API
|
|
* <a href="../../../../site/markdown/blobEndpoint.md#get-block-list">Get Block List</a>.
|
|
* <a href="../../../../site/markdown/blobEndpoint.md#get-block-list">Get Block List</a>.
|
|
@@ -1004,8 +1083,10 @@ public class AbfsBlobClient extends AbfsClient {
|
|
requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl));
|
|
requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl));
|
|
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
|
|
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
|
|
|
|
|
|
- return getAbfsRestOperation(AbfsRestOperationType.CopyBlob, HTTP_METHOD_PUT,
|
|
|
|
|
|
+ final AbfsRestOperation op = getAbfsRestOperation(AbfsRestOperationType.CopyBlob, HTTP_METHOD_PUT,
|
|
url, requestHeaders);
|
|
url, requestHeaders);
|
|
|
|
+ op.execute(tracingContext);
|
|
|
|
+ return op;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1037,16 +1118,204 @@ public class AbfsBlobClient extends AbfsClient {
|
|
return op;
|
|
return op;
|
|
}
|
|
}
|
|
|
|
|
|
- private static String encodeMetadataAttribute(String value)
|
|
|
|
- throws UnsupportedEncodingException {
|
|
|
|
- return value == null ? null
|
|
|
|
- : URLEncoder.encode(value, XMS_PROPERTIES_ENCODING_UNICODE);
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Checks if the rest operation results indicate if the path is a directory.
|
|
|
|
+ * @param result executed rest operation containing response from server.
|
|
|
|
+ * @return True if the path is a directory, False otherwise.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public boolean checkIsDir(AbfsHttpOperation result) {
|
|
|
|
+ String resourceType = result.getResponseHeader(X_MS_META_HDI_ISFOLDER);
|
|
|
|
+ return resourceType != null && resourceType.equals(TRUE);
|
|
}
|
|
}
|
|
|
|
|
|
- private static String decodeMetadataAttribute(String encoded)
|
|
|
|
- throws UnsupportedEncodingException {
|
|
|
|
- return encoded == null ? null
|
|
|
|
- : URLDecoder.decode(encoded, XMS_PROPERTIES_ENCODING_UNICODE);
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Returns true if the status code lies in the range of user error.
|
|
|
|
+ * In the case of HTTP_CONFLICT for PutBlockList we fall back to DFS and hence
|
|
|
|
+ * this retry handling is not needed.
|
|
|
|
+ * @param responseStatusCode http response status code.
|
|
|
|
+ * @return True or False.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public boolean checkUserError(int responseStatusCode) {
|
|
|
|
+ return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
|
|
|
|
+ && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
|
|
|
|
+ && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the continuation token from the response from BLOB Endpoint Listing.
|
|
|
|
+ * Continuation Token will be present in XML List response body.
|
|
|
|
+ * @param result The response from the server.
|
|
|
|
+ * @return The continuation token.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public String getContinuationFromResponse(AbfsHttpOperation result) {
|
|
|
|
+ BlobListResultSchema listResultSchema = (BlobListResultSchema) result.getListResultSchema();
|
|
|
|
+ return listResultSchema.getNextMarker();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the User-defined metadata on a path from response headers of
|
|
|
|
+ * GetBlobProperties API on Blob Endpoint.
|
|
|
|
+ * Blob Endpoint returns each metadata as a separate header.
|
|
|
|
+ * @param result The response of GetBlobProperties call from the server.
|
|
|
|
+ * @return Hashtable containing metadata key-value pairs.
|
|
|
|
+ * @throws InvalidAbfsRestOperationException if parsing fails.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public Hashtable<String, String> getXMSProperties(AbfsHttpOperation result)
|
|
|
|
+ throws InvalidAbfsRestOperationException {
|
|
|
|
+ Hashtable<String, String> properties = new Hashtable<>();
|
|
|
|
+ Map<String, List<String>> responseHeaders = result.getResponseHeaders();
|
|
|
|
+ for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
|
|
|
|
+ String name = entry.getKey();
|
|
|
|
+ if (name != null && name.startsWith(X_MS_METADATA_PREFIX)) {
|
|
|
|
+ String value;
|
|
|
|
+ try {
|
|
|
|
+ value = decodeMetadataAttribute(entry.getValue().get(0));
|
|
|
|
+ } catch (UnsupportedEncodingException e) {
|
|
|
|
+ throw new InvalidAbfsRestOperationException(e);
|
|
|
|
+ }
|
|
|
|
+ properties.put(name.substring(X_MS_METADATA_PREFIX.length()), value);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return properties;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Parse the XML response body returned by ListBlob API on Blob Endpoint.
|
|
|
|
+ * @param stream InputStream contains the response from server.
|
|
|
|
+ * @return BlobListResultSchema containing the list of entries.
|
|
|
|
+ * @throws IOException if parsing fails.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public ListResultSchema parseListPathResults(final InputStream stream) throws IOException {
|
|
|
|
+ if (stream == null) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ BlobListResultSchema listResultSchema;
|
|
|
|
+ try {
|
|
|
|
+ final SAXParser saxParser = saxParserThreadLocal.get();
|
|
|
|
+ saxParser.reset();
|
|
|
|
+ listResultSchema = new BlobListResultSchema();
|
|
|
|
+ saxParser.parse(stream, new BlobListXmlParser(listResultSchema, getBaseUrl().toString()));
|
|
|
|
+ } catch (SAXException | IOException e) {
|
|
|
|
+ throw new RuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return removeDuplicateEntries(listResultSchema);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Parse the XML response body returned by GetBlockList API on Blob Endpoint.
|
|
|
|
+ * @param stream InputStream contains the response from server.
|
|
|
|
+ * @return List of blockIds.
|
|
|
|
+ * @throws IOException if parsing fails.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public List<String> parseBlockListResponse(final InputStream stream) throws IOException {
|
|
|
|
+ List<String> blockIdList = new ArrayList<>();
|
|
|
|
+ // Convert the input stream to a Document object
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
|
|
|
|
+ Document doc = factory.newDocumentBuilder().parse(stream);
|
|
|
|
+
|
|
|
|
+ // Find the CommittedBlocks element and extract the list of block IDs
|
|
|
|
+ NodeList committedBlocksList = doc.getElementsByTagName(
|
|
|
|
+ XML_TAG_COMMITTED_BLOCKS);
|
|
|
|
+ if (committedBlocksList.getLength() > 0) {
|
|
|
|
+ Node committedBlocks = committedBlocksList.item(0);
|
|
|
|
+ NodeList blockList = committedBlocks.getChildNodes();
|
|
|
|
+ for (int i = 0; i < blockList.getLength(); i++) {
|
|
|
|
+ Node block = blockList.item(i);
|
|
|
|
+ if (block.getNodeName().equals(XML_TAG_BLOCK_NAME)) {
|
|
|
|
+ NodeList nameList = block.getChildNodes();
|
|
|
|
+ for (int j = 0; j < nameList.getLength(); j++) {
|
|
|
|
+ Node name = nameList.item(j);
|
|
|
|
+ if (name.getNodeName().equals(XML_TAG_NAME)) {
|
|
|
|
+ String blockId = name.getTextContent();
|
|
|
|
+ blockIdList.add(blockId);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (ParserConfigurationException | SAXException e) {
|
|
|
|
+ throw new IOException(e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return blockIdList;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Parse the XML response body returned by error stream for all blob endpoint APIs.
|
|
|
|
+ * @param stream ErrorStream contains the response from server.
|
|
|
|
+ * @return StorageErrorResponseSchema containing the error code and message.
|
|
|
|
+ * @throws IOException if parsing fails.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public StorageErrorResponseSchema processStorageErrorResponse(final InputStream stream) throws IOException {
|
|
|
|
+ final String data = IOUtils.toString(stream, StandardCharsets.UTF_8);
|
|
|
|
+ String storageErrorCode = EMPTY_STRING;
|
|
|
|
+ String storageErrorMessage = EMPTY_STRING;
|
|
|
|
+ String expectedAppendPos = EMPTY_STRING;
|
|
|
|
+ int codeStartFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_CODE_START_XML);
|
|
|
|
+ int codeEndFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_CODE_END_XML);
|
|
|
|
+ if (codeEndFirstInstance != -1 && codeStartFirstInstance != -1) {
|
|
|
|
+ storageErrorCode = data.substring(codeStartFirstInstance,
|
|
|
|
+ codeEndFirstInstance).replace(XML_TAG_BLOB_ERROR_CODE_START_XML, "");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int msgStartFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_MESSAGE_START_XML);
|
|
|
|
+ int msgEndFirstInstance = data.indexOf(XML_TAG_BLOB_ERROR_MESSAGE_END_XML);
|
|
|
|
+ if (msgEndFirstInstance != -1 && msgStartFirstInstance != -1) {
|
|
|
|
+ storageErrorMessage = data.substring(msgStartFirstInstance,
|
|
|
|
+ msgEndFirstInstance).replace(XML_TAG_BLOB_ERROR_MESSAGE_START_XML, "");
|
|
|
|
+ }
|
|
|
|
+ return new StorageErrorResponseSchema(storageErrorCode, storageErrorMessage, expectedAppendPos);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Encode the value of the attribute to be set as metadata.
|
|
|
|
+ * Blob Endpoint support Unicode characters in metadata values.
|
|
|
|
+ * @param value to be encoded.
|
|
|
|
+ * @return encoded value.
|
|
|
|
+ * @throws UnsupportedEncodingException if encoding fails.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
|
|
|
|
+ return value.getBytes(XMS_PROPERTIES_ENCODING_UNICODE);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Decode the value of the attribute from the metadata.
|
|
|
|
+ * Blob Endpoint support Unicode characters in metadata values.
|
|
|
|
+ * @param value to be decoded.
|
|
|
|
+ * @return decoded value.
|
|
|
|
+ * @throws UnsupportedEncodingException if decoding fails.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public String decodeAttribute(byte[] value) throws UnsupportedEncodingException {
|
|
|
|
+ return new String(value, XMS_PROPERTIES_ENCODING_UNICODE);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Blob Endpoint Supports Delimiter based listing where the
|
|
|
|
+ * directory path to be listed must end with a Forward Slash.
|
|
|
|
+ * @param path directory path to be listed.
|
|
|
|
+ * @return directory path with forward slash at end.
|
|
|
|
+ */
|
|
|
|
+ public static String getDirectoryQueryParameter(final String path) {
|
|
|
|
+ String directory = AbfsClient.getDirectoryQueryParameter(path);
|
|
|
|
+ if (directory.isEmpty()) {
|
|
|
|
+ return directory;
|
|
|
|
+ }
|
|
|
|
+ if (!directory.endsWith(FORWARD_SLASH)) {
|
|
|
|
+ directory = directory + FORWARD_SLASH;
|
|
|
|
+ }
|
|
|
|
+ return directory;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1066,6 +1335,14 @@ public class AbfsBlobClient extends AbfsClient {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the list of headers to be set for metadata properties.
|
|
|
|
+ * Blob Endpoint accepts each metadata as a separate header.
|
|
|
|
+ * @param properties to be set as metadata
|
|
|
|
+ * @return List of headers to be set.
|
|
|
|
+ * @throws AbfsRestOperationException if encoding fails.
|
|
|
|
+ * @throws CharacterCodingException if value is not pure ASCII.
|
|
|
|
+ */
|
|
private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, String> properties)
|
|
private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, String> properties)
|
|
throws AbfsRestOperationException, CharacterCodingException {
|
|
throws AbfsRestOperationException, CharacterCodingException {
|
|
List<AbfsHttpHeader> metadataRequestHeaders = new ArrayList<>();
|
|
List<AbfsHttpHeader> metadataRequestHeaders = new ArrayList<>();
|
|
@@ -1084,4 +1361,112 @@ public class AbfsBlobClient extends AbfsClient {
|
|
}
|
|
}
|
|
return metadataRequestHeaders;
|
|
return metadataRequestHeaders;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private final ThreadLocal<SAXParser> saxParserThreadLocal = ThreadLocal.withInitial(() -> {
|
|
|
|
+ SAXParserFactory factory = SAXParserFactory.newInstance();
|
|
|
|
+ factory.setNamespaceAware(true);
|
|
|
|
+ try {
|
|
|
|
+ return factory.newSAXParser();
|
|
|
|
+ } catch (SAXException e) {
|
|
|
|
+ throw new RuntimeException("Unable to create SAXParser", e);
|
|
|
|
+ } catch (ParserConfigurationException e) {
|
|
|
|
+ throw new RuntimeException("Check parser configuration", e);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This is to handle duplicate listing entries returned by Blob Endpoint for
|
|
|
|
+ * implicit paths that also has a marker file created for them.
|
|
|
|
+ * This will retain entry corresponding to marker file and remove the BlobPrefix entry.
|
|
|
|
+ * @param listResultSchema List of entries returned by Blob Endpoint.
|
|
|
|
+ * @return List of entries after removing duplicates.
|
|
|
|
+ */
|
|
|
|
+ private BlobListResultSchema removeDuplicateEntries(BlobListResultSchema listResultSchema) {
|
|
|
|
+ List<BlobListResultEntrySchema> uniqueEntries = new ArrayList<>();
|
|
|
|
+ TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();
|
|
|
|
+
|
|
|
|
+ for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
|
|
|
|
+ if (StringUtils.isNotEmpty(entry.eTag())) {
|
|
|
|
+ // This is a blob entry. It is either a file or a marker blob.
|
|
|
|
+ // In both cases we will add this.
|
|
|
|
+ nameToEntryMap.put(entry.name(), entry);
|
|
|
|
+ } else {
|
|
|
|
+ // This is a BlobPrefix entry. It is a directory with file inside
|
|
|
|
+ // This might have already been added as a marker blob.
|
|
|
|
+ if (!nameToEntryMap.containsKey(entry.name())) {
|
|
|
|
+ nameToEntryMap.put(entry.name(), entry);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ uniqueEntries.addAll(nameToEntryMap.values());
|
|
|
|
+ listResultSchema.withPaths(uniqueEntries);
|
|
|
|
+ return listResultSchema;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * When listing is done on a file, Blob Endpoint returns the empty listing
|
|
|
|
+ * but DFS Endpoint returns the file status as one of the entries.
|
|
|
|
+ * This is to convert file status into ListResultSchema.
|
|
|
|
+ * @param relativePath
|
|
|
|
+ * @param pathStatus
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePath, AbfsRestOperation pathStatus) {
|
|
|
|
+ BlobListResultSchema listResultSchema = new BlobListResultSchema();
|
|
|
|
+
|
|
|
|
+ BlobListResultEntrySchema entrySchema = new BlobListResultEntrySchema();
|
|
|
|
+ entrySchema.setUrl(pathStatus.getUrl().toString());
|
|
|
|
+ entrySchema.setPath(new Path(relativePath));
|
|
|
|
+ entrySchema.setName(relativePath.charAt(0) == '/' ? relativePath.substring(1) : relativePath);
|
|
|
|
+ entrySchema.setIsDirectory(checkIsDir(pathStatus.getResult()));
|
|
|
|
+ entrySchema.setContentLength(Long.parseLong(pathStatus.getResult().getResponseHeader(CONTENT_LENGTH)));
|
|
|
|
+ entrySchema.setLastModifiedTime(
|
|
|
|
+ pathStatus.getResult().getResponseHeader(LAST_MODIFIED));
|
|
|
|
+ entrySchema.setETag(AzureBlobFileSystemStore.extractEtagHeader(pathStatus.getResult()));
|
|
|
|
+
|
|
|
|
+ // If listing is done on explicit directory, do not include directory in the listing.
|
|
|
|
+ if (!entrySchema.isDirectory()) {
|
|
|
|
+ listResultSchema.paths().add(entrySchema);
|
|
|
|
+ }
|
|
|
|
+ return listResultSchema;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static String encodeMetadataAttribute(String value)
|
|
|
|
+ throws UnsupportedEncodingException {
|
|
|
|
+ return value == null ? null
|
|
|
|
+ : URLEncoder.encode(value, StandardCharsets.UTF_8.name());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static String decodeMetadataAttribute(String encoded)
|
|
|
|
+ throws UnsupportedEncodingException {
|
|
|
|
+ return encoded == null ? null
|
|
|
|
+ : java.net.URLDecoder.decode(encoded, StandardCharsets.UTF_8.name());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean isNonEmptyListing(String path,
|
|
|
|
+ TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
|
|
+ AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext, false);
|
|
|
|
+ return !isEmptyListResults(listOp.getResult());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Check if the list call returned empty results without any continuation token.
|
|
|
|
+ * @param result The response of listing API from the server.
|
|
|
|
+ * @return True if empty results without continuation token.
|
|
|
|
+ */
|
|
|
|
+ private boolean isEmptyListResults(AbfsHttpOperation result) {
|
|
|
|
+ boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
|
|
|
|
+ result.getListResultSchema() != null && // Parsing of list response was successful
|
|
|
|
+ result.getListResultSchema().paths().isEmpty() && // No paths were returned
|
|
|
|
+ result.getListResultSchema() instanceof BlobListResultSchema && // It is safe to typecast to BlobListResultSchema
|
|
|
|
+ ((BlobListResultSchema) result.getListResultSchema()).getNextMarker() == null; // No continuation token was returned
|
|
|
|
+ if (isEmptyList) {
|
|
|
|
+ LOG.debug("List call returned empty results without any continuation token.");
|
|
|
|
+ return true;
|
|
|
|
+ } else if (result != null && !(result.getListResultSchema() instanceof BlobListResultSchema)) {
|
|
|
|
+ throw new RuntimeException("List call returned unexpected results over Blob Endpoint.");
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
}
|
|
}
|