|
@@ -27,6 +27,7 @@ import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.io.UnsupportedEncodingException;
|
|
import java.io.UnsupportedEncodingException;
|
|
import java.net.HttpURLConnection;
|
|
import java.net.HttpURLConnection;
|
|
|
|
+import java.net.URI;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
import java.net.URLDecoder;
|
|
import java.net.URLDecoder;
|
|
import java.net.URLEncoder;
|
|
import java.net.URLEncoder;
|
|
@@ -36,6 +37,7 @@ import java.nio.charset.CharsetEncoder;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Hashtable;
|
|
import java.util.Hashtable;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -50,6 +52,7 @@ import org.xml.sax.SAXException;
|
|
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.classification.VisibleForTesting;
|
|
import org.apache.hadoop.classification.VisibleForTesting;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -60,6 +63,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
|
|
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
|
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
|
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
|
|
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
|
@@ -70,7 +74,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListXmlParser;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListXmlParser;
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
|
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
|
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
|
@@ -163,6 +166,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARA
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAX_RESULTS;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAX_RESULTS;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PREFIX;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PREFIX;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
|
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING;
|
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.PATH_EXISTS;
|
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.PATH_EXISTS;
|
|
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.isKeyForDirectorySet;
|
|
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.isKeyForDirectorySet;
|
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION;
|
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION;
|
|
@@ -340,18 +344,19 @@ public class AbfsBlobClient extends AbfsClient {
|
|
* @param listMaxResults maximum number of blobs to return.
|
|
* @param listMaxResults maximum number of blobs to return.
|
|
* @param continuation marker to specify the continuation token.
|
|
* @param continuation marker to specify the continuation token.
|
|
* @param tracingContext for tracing the service call.
|
|
* @param tracingContext for tracing the service call.
|
|
- * @return executed rest operation containing response from server.
|
|
|
|
|
|
+ * @param uri to be used for path conversion.
|
|
|
|
+ * @return {@link ListResponseData}. containing listing response.
|
|
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
|
|
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
|
|
*/
|
|
*/
|
|
- public AbfsRestOperation listPath(final String relativePath, final boolean recursive,
|
|
|
|
- final int listMaxResults, final String continuation, TracingContext tracingContext)
|
|
|
|
- throws IOException {
|
|
|
|
- return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, true);
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public ListResponseData listPath(final String relativePath, final boolean recursive,
|
|
|
|
+ final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws IOException {
|
|
|
|
+ return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, uri, true);
|
|
}
|
|
}
|
|
|
|
|
|
- public AbfsRestOperation listPath(final String relativePath, final boolean recursive,
|
|
|
|
- final int listMaxResults, final String continuation, TracingContext tracingContext,
|
|
|
|
- boolean is404CheckRequired) throws AzureBlobFileSystemException {
|
|
|
|
|
|
+ public ListResponseData listPath(final String relativePath, final boolean recursive,
|
|
|
|
+ final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri, boolean is404CheckRequired)
|
|
|
|
+ throws AzureBlobFileSystemException {
|
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
|
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
|
|
|
|
|
AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
|
AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
|
@@ -374,63 +379,63 @@ public class AbfsBlobClient extends AbfsClient {
|
|
requestHeaders);
|
|
requestHeaders);
|
|
|
|
|
|
op.execute(tracingContext);
|
|
op.execute(tracingContext);
|
|
- // Filter the paths for which no rename redo operation is performed.
|
|
|
|
- fixAtomicEntriesInListResults(op, tracingContext);
|
|
|
|
- if (isEmptyListResults(op.getResult()) && is404CheckRequired) {
|
|
|
|
|
|
+ ListResponseData listResponseData = parseListPathResults(op.getResult(), uri);
|
|
|
|
+ listResponseData.setOp(op);
|
|
|
|
+
|
|
|
|
+ // Perform Pending Rename Redo Operation on Atomic Rename Paths.
|
|
|
|
+ // Crashed HBase log rename recovery can be done by Filesystem.listStatus.
|
|
|
|
+ if (tracingContext.getOpType() == FSOperationType.LISTSTATUS
|
|
|
|
+ && op.getResult() != null
|
|
|
|
+ && op.getResult().getStatusCode() == HTTP_OK) {
|
|
|
|
+ retryRenameOnAtomicEntriesInListResults(tracingContext,
|
|
|
|
+ listResponseData.getRenamePendingJsonPaths());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (isEmptyListResults(listResponseData) && is404CheckRequired) {
|
|
// If the list operation returns no paths, we need to check if the path is a file.
|
|
// If the list operation returns no paths, we need to check if the path is a file.
|
|
// If it is a file, we need to return the file in the list.
|
|
// If it is a file, we need to return the file in the list.
|
|
// If it is a non-existing path, we need to throw a FileNotFoundException.
|
|
// If it is a non-existing path, we need to throw a FileNotFoundException.
|
|
if (relativePath.equals(ROOT_PATH)) {
|
|
if (relativePath.equals(ROOT_PATH)) {
|
|
- // Root Always exists as directory. It can be a empty listing.
|
|
|
|
- return op;
|
|
|
|
|
|
+ // Root Always exists as directory. It can be an empty listing.
|
|
|
|
+ return listResponseData;
|
|
}
|
|
}
|
|
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
|
|
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
|
|
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
|
|
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
|
|
|
|
+ LOG.debug("ListBlob attempted on a file path. Returning file status.");
|
|
|
|
+ List<FileStatus> fileStatusList = new ArrayList<>();
|
|
|
|
+ for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
|
|
|
|
+ fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
|
|
|
|
+ }
|
|
AbfsRestOperation listOp = getAbfsRestOperation(
|
|
AbfsRestOperation listOp = getAbfsRestOperation(
|
|
AbfsRestOperationType.ListBlobs,
|
|
AbfsRestOperationType.ListBlobs,
|
|
HTTP_METHOD_GET,
|
|
HTTP_METHOD_GET,
|
|
url,
|
|
url,
|
|
requestHeaders);
|
|
requestHeaders);
|
|
listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
|
|
listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
|
|
- return listOp;
|
|
|
|
|
|
+ listResponseData.setFileStatusList(fileStatusList);
|
|
|
|
+ listResponseData.setContinuationToken(null);
|
|
|
|
+ listResponseData.setRenamePendingJsonPaths(null);
|
|
|
|
+ listResponseData.setOp(listOp);
|
|
|
|
+ return listResponseData;
|
|
}
|
|
}
|
|
- return op;
|
|
|
|
|
|
+ return listResponseData;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Filter the paths for which no rename redo operation is performed.
|
|
* Filter the paths for which no rename redo operation is performed.
|
|
* Update BlobListResultSchema path with filtered entries.
|
|
* Update BlobListResultSchema path with filtered entries.
|
|
- *
|
|
|
|
- * @param op blob list operation
|
|
|
|
* @param tracingContext tracing context
|
|
* @param tracingContext tracing context
|
|
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
|
|
* @throws AzureBlobFileSystemException if rest operation or response parsing fails.
|
|
*/
|
|
*/
|
|
- private void fixAtomicEntriesInListResults(final AbfsRestOperation op,
|
|
|
|
- final TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
|
|
- /*
|
|
|
|
- * Crashed HBase log rename recovery is done by Filesystem.getFileStatus and
|
|
|
|
- * Filesystem.listStatus.
|
|
|
|
- */
|
|
|
|
- if (tracingContext == null
|
|
|
|
- || tracingContext.getOpType() != FSOperationType.LISTSTATUS
|
|
|
|
- || op == null || op.getResult() == null
|
|
|
|
- || op.getResult().getStatusCode() != HTTP_OK) {
|
|
|
|
|
|
+ private void retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext,
|
|
|
|
+ Map<Path, Integer> renamePendingJsonPaths) throws AzureBlobFileSystemException {
|
|
|
|
+ if (renamePendingJsonPaths == null || renamePendingJsonPaths.isEmpty()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- BlobListResultSchema listResultSchema
|
|
|
|
- = (BlobListResultSchema) op.getResult().getListResultSchema();
|
|
|
|
- if (listResultSchema == null) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- List<BlobListResultEntrySchema> filteredEntries = new ArrayList<>();
|
|
|
|
- for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
|
|
|
|
- if (!takeListPathAtomicRenameKeyAction(entry.path(), entry.isDirectory(),
|
|
|
|
- entry.contentLength().intValue(), tracingContext)) {
|
|
|
|
- filteredEntries.add(entry);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
|
|
- listResultSchema.withPaths(filteredEntries);
|
|
|
|
|
|
+ for (Map.Entry<Path, Integer> entry : renamePendingJsonPaths.entrySet()) {
|
|
|
|
+ retryRenameOnAtomicKeyPath(entry.getKey(), entry.getValue(), tracingContext);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**{@inheritDoc}*/
|
|
/**{@inheritDoc}*/
|
|
@@ -1162,10 +1167,7 @@ public class AbfsBlobClient extends AbfsClient {
|
|
throws AzureBlobFileSystemException {
|
|
throws AzureBlobFileSystemException {
|
|
AbfsRestOperation op = this.getPathStatus(path, tracingContext,
|
|
AbfsRestOperation op = this.getPathStatus(path, tracingContext,
|
|
contextEncryptionAdapter, true);
|
|
contextEncryptionAdapter, true);
|
|
- /*
|
|
|
|
- * Crashed HBase log-folder rename can be recovered by FileSystem#getFileStatus
|
|
|
|
- * and FileSystem#listStatus calls.
|
|
|
|
- */
|
|
|
|
|
|
+ // Crashed HBase log-folder rename can be recovered by FileSystem#getFileStatus
|
|
if (tracingContext.getOpType() == FSOperationType.GET_FILESTATUS
|
|
if (tracingContext.getOpType() == FSOperationType.GET_FILESTATUS
|
|
&& op.getResult() != null && checkIsDir(op.getResult())) {
|
|
&& op.getResult() != null && checkIsDir(op.getResult())) {
|
|
takeGetPathStatusAtomicRenameKeyAction(new Path(path), tracingContext);
|
|
takeGetPathStatusAtomicRenameKeyAction(new Path(path), tracingContext);
|
|
@@ -1212,6 +1214,8 @@ public class AbfsBlobClient extends AbfsClient {
|
|
if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
|
|
if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
|
|
&& isImplicitCheckRequired && isNonEmptyDirectory(path, tracingContext)) {
|
|
&& isImplicitCheckRequired && isNonEmptyDirectory(path, tracingContext)) {
|
|
// Implicit path found.
|
|
// Implicit path found.
|
|
|
|
+ // Create a marker blob at this path.
|
|
|
|
+ this.createMarkerAtPath(path, null, contextEncryptionAdapter, tracingContext);
|
|
AbfsRestOperation successOp = getSuccessOp(
|
|
AbfsRestOperation successOp = getSuccessOp(
|
|
AbfsRestOperationType.GetPathStatus, HTTP_METHOD_HEAD,
|
|
AbfsRestOperationType.GetPathStatus, HTTP_METHOD_HEAD,
|
|
url, requestHeaders);
|
|
url, requestHeaders);
|
|
@@ -1562,18 +1566,6 @@ public class AbfsBlobClient extends AbfsClient {
|
|
&& responseStatusCode != HTTP_CONFLICT);
|
|
&& responseStatusCode != HTTP_CONFLICT);
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Get the continuation token from the response from BLOB Endpoint Listing.
|
|
|
|
- * Continuation Token will be present in XML List response body.
|
|
|
|
- * @param result The response from the server.
|
|
|
|
- * @return The continuation token.
|
|
|
|
- */
|
|
|
|
- @Override
|
|
|
|
- public String getContinuationFromResponse(AbfsHttpOperation result) {
|
|
|
|
- BlobListResultSchema listResultSchema = (BlobListResultSchema) result.getListResultSchema();
|
|
|
|
- return listResultSchema.getNextMarker();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Get the User-defined metadata on a path from response headers of
|
|
* Get the User-defined metadata on a path from response headers of
|
|
* GetBlobProperties API on Blob Endpoint.
|
|
* GetBlobProperties API on Blob Endpoint.
|
|
@@ -1604,26 +1596,43 @@ public class AbfsBlobClient extends AbfsClient {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Parse the XML response body returned by ListBlob API on Blob Endpoint.
|
|
* Parse the XML response body returned by ListBlob API on Blob Endpoint.
|
|
- * @param stream InputStream contains the response from server.
|
|
|
|
- * @return BlobListResultSchema containing the list of entries.
|
|
|
|
- * @throws IOException if parsing fails.
|
|
|
|
|
|
+ * @param result InputStream contains the response from server.
|
|
|
|
+ * @param uri to be used for path conversion.
|
|
|
|
+ * @return {@link ListResponseData}. containing listing response.
|
|
|
|
+ * @throws AzureBlobFileSystemException if parsing fails.
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- public ListResultSchema parseListPathResults(final InputStream stream) throws IOException {
|
|
|
|
- if (stream == null) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
|
|
+ public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri)
|
|
|
|
+ throws AzureBlobFileSystemException {
|
|
BlobListResultSchema listResultSchema;
|
|
BlobListResultSchema listResultSchema;
|
|
- try {
|
|
|
|
- final SAXParser saxParser = saxParserThreadLocal.get();
|
|
|
|
- saxParser.reset();
|
|
|
|
- listResultSchema = new BlobListResultSchema();
|
|
|
|
- saxParser.parse(stream, new BlobListXmlParser(listResultSchema, getBaseUrl().toString()));
|
|
|
|
- } catch (SAXException | IOException e) {
|
|
|
|
- throw new RuntimeException(e);
|
|
|
|
|
|
+ try (InputStream stream = result.getListResultStream()) {
|
|
|
|
+ if (stream == null) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ final SAXParser saxParser = saxParserThreadLocal.get();
|
|
|
|
+ saxParser.reset();
|
|
|
|
+ listResultSchema = new BlobListResultSchema();
|
|
|
|
+ saxParser.parse(stream,
|
|
|
|
+ new BlobListXmlParser(listResultSchema, getBaseUrl().toString()));
|
|
|
|
+ result.setListResultSchema(listResultSchema);
|
|
|
|
+ LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
|
|
|
|
+ listResultSchema.paths().size(),
|
|
|
|
+ listResultSchema.getNextMarker());
|
|
|
|
+ } catch (SAXException | IOException e) {
|
|
|
|
+ throw new AbfsDriverException(e);
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Unable to deserialize list results for uri {}", uri.toString(), e);
|
|
|
|
+ throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
|
|
}
|
|
}
|
|
|
|
|
|
- return removeDuplicateEntries(listResultSchema);
|
|
|
|
|
|
+ try {
|
|
|
|
+ return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Unable to filter list results for uri {}", uri.toString(), e);
|
|
|
|
+ throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1764,13 +1773,14 @@ public class AbfsBlobClient extends AbfsClient {
|
|
AbfsRestOperation pendingJsonFileStatus;
|
|
AbfsRestOperation pendingJsonFileStatus;
|
|
Path pendingJsonPath = new Path(path.getParent(),
|
|
Path pendingJsonPath = new Path(path.getParent(),
|
|
path.toUri().getPath() + RenameAtomicity.SUFFIX);
|
|
path.toUri().getPath() + RenameAtomicity.SUFFIX);
|
|
|
|
+ int pendingJsonFileContentLength = 0;
|
|
try {
|
|
try {
|
|
- pendingJsonFileStatus = getPathStatus(
|
|
|
|
- pendingJsonPath.toUri().getPath(), tracingContext,
|
|
|
|
- null, false);
|
|
|
|
|
|
+ pendingJsonFileStatus = getPathStatus(pendingJsonPath.toUri().getPath(),
|
|
|
|
+ tracingContext, null, false);
|
|
if (checkIsDir(pendingJsonFileStatus.getResult())) {
|
|
if (checkIsDir(pendingJsonFileStatus.getResult())) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
+ pendingJsonFileContentLength = Integer.parseInt(pendingJsonFileStatus.getResult().getResponseHeader(CONTENT_LENGTH));
|
|
} catch (AbfsRestOperationException ex) {
|
|
} catch (AbfsRestOperationException ex) {
|
|
if (ex.getStatusCode() == HTTP_NOT_FOUND) {
|
|
if (ex.getStatusCode() == HTTP_NOT_FOUND) {
|
|
return;
|
|
return;
|
|
@@ -1781,9 +1791,7 @@ public class AbfsBlobClient extends AbfsClient {
|
|
boolean renameSrcHasChanged;
|
|
boolean renameSrcHasChanged;
|
|
try {
|
|
try {
|
|
RenameAtomicity renameAtomicity = getRedoRenameAtomicity(
|
|
RenameAtomicity renameAtomicity = getRedoRenameAtomicity(
|
|
- pendingJsonPath, Integer.parseInt(pendingJsonFileStatus.getResult()
|
|
|
|
- .getResponseHeader(CONTENT_LENGTH)),
|
|
|
|
- tracingContext);
|
|
|
|
|
|
+ pendingJsonPath, pendingJsonFileContentLength, tracingContext);
|
|
renameAtomicity.redo();
|
|
renameAtomicity.redo();
|
|
renameSrcHasChanged = false;
|
|
renameSrcHasChanged = false;
|
|
} catch (AbfsRestOperationException ex) {
|
|
} catch (AbfsRestOperationException ex) {
|
|
@@ -1818,23 +1826,16 @@ public class AbfsBlobClient extends AbfsClient {
|
|
* @param renamePendingJsonLen length of the pendingJson file.
|
|
* @param renamePendingJsonLen length of the pendingJson file.
|
|
* @param tracingContext tracing context.
|
|
* @param tracingContext tracing context.
|
|
*
|
|
*
|
|
- * @return true if action is taken.
|
|
|
|
* @throws AzureBlobFileSystemException server error
|
|
* @throws AzureBlobFileSystemException server error
|
|
*/
|
|
*/
|
|
- private boolean takeListPathAtomicRenameKeyAction(final Path path,
|
|
|
|
- final boolean isDirectory, final int renamePendingJsonLen,
|
|
|
|
|
|
+
|
|
|
|
+ private void retryRenameOnAtomicKeyPath(final Path path,
|
|
|
|
+ final int renamePendingJsonLen,
|
|
final TracingContext tracingContext)
|
|
final TracingContext tracingContext)
|
|
throws AzureBlobFileSystemException {
|
|
throws AzureBlobFileSystemException {
|
|
- if (path == null || path.isRoot() || !isAtomicRenameKey(
|
|
|
|
- path.toUri().getPath()) || isDirectory || !path.toUri()
|
|
|
|
- .getPath()
|
|
|
|
- .endsWith(RenameAtomicity.SUFFIX)) {
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
try {
|
|
try {
|
|
- RenameAtomicity renameAtomicity
|
|
|
|
- = getRedoRenameAtomicity(path, renamePendingJsonLen,
|
|
|
|
- tracingContext);
|
|
|
|
|
|
+ RenameAtomicity renameAtomicity = getRedoRenameAtomicity(path,
|
|
|
|
+ renamePendingJsonLen, tracingContext);
|
|
renameAtomicity.redo();
|
|
renameAtomicity.redo();
|
|
} catch (AbfsRestOperationException ex) {
|
|
} catch (AbfsRestOperationException ex) {
|
|
/*
|
|
/*
|
|
@@ -1850,7 +1851,6 @@ public class AbfsBlobClient extends AbfsClient {
|
|
throw ex;
|
|
throw ex;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return true;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@@ -1924,39 +1924,65 @@ public class AbfsBlobClient extends AbfsClient {
|
|
* This is to handle duplicate listing entries returned by Blob Endpoint for
|
|
* This is to handle duplicate listing entries returned by Blob Endpoint for
|
|
* implicit paths that also has a marker file created for them.
|
|
* implicit paths that also has a marker file created for them.
|
|
* This will retain entry corresponding to marker file and remove the BlobPrefix entry.
|
|
* This will retain entry corresponding to marker file and remove the BlobPrefix entry.
|
|
|
|
+ * This will also filter out all the rename pending json files in listing output.
|
|
* @param listResultSchema List of entries returned by Blob Endpoint.
|
|
* @param listResultSchema List of entries returned by Blob Endpoint.
|
|
|
|
+ * @param uri URI to be used for path conversion.
|
|
* @return List of entries after removing duplicates.
|
|
* @return List of entries after removing duplicates.
|
|
*/
|
|
*/
|
|
- private BlobListResultSchema removeDuplicateEntries(BlobListResultSchema listResultSchema) {
|
|
|
|
- List<BlobListResultEntrySchema> uniqueEntries = new ArrayList<>();
|
|
|
|
|
|
+ private ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
|
|
|
|
+ BlobListResultSchema listResultSchema, URI uri) throws IOException {
|
|
|
|
+ List<FileStatus> fileStatuses = new ArrayList<>();
|
|
|
|
+ Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
|
|
TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();
|
|
TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();
|
|
|
|
|
|
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
|
|
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
|
|
if (StringUtils.isNotEmpty(entry.eTag())) {
|
|
if (StringUtils.isNotEmpty(entry.eTag())) {
|
|
// This is a blob entry. It is either a file or a marker blob.
|
|
// This is a blob entry. It is either a file or a marker blob.
|
|
// In both cases we will add this.
|
|
// In both cases we will add this.
|
|
- nameToEntryMap.put(entry.name(), entry);
|
|
|
|
|
|
+ if (isRenamePendingJsonPathEntry(entry)) {
|
|
|
|
+ renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
|
|
|
|
+ } else {
|
|
|
|
+ nameToEntryMap.put(entry.name(), entry);
|
|
|
|
+ fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
// This is a BlobPrefix entry. It is a directory with file inside
|
|
// This is a BlobPrefix entry. It is a directory with file inside
|
|
// This might have already been added as a marker blob.
|
|
// This might have already been added as a marker blob.
|
|
if (!nameToEntryMap.containsKey(entry.name())) {
|
|
if (!nameToEntryMap.containsKey(entry.name())) {
|
|
nameToEntryMap.put(entry.name(), entry);
|
|
nameToEntryMap.put(entry.name(), entry);
|
|
|
|
+ fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- uniqueEntries.addAll(nameToEntryMap.values());
|
|
|
|
- listResultSchema.withPaths(uniqueEntries);
|
|
|
|
- return listResultSchema;
|
|
|
|
|
|
+ ListResponseData listResponseData = new ListResponseData();
|
|
|
|
+ listResponseData.setFileStatusList(fileStatuses);
|
|
|
|
+ listResponseData.setRenamePendingJsonPaths(renamePendingJsonPaths);
|
|
|
|
+ listResponseData.setContinuationToken(listResultSchema.getNextMarker());
|
|
|
|
+ return listResponseData;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Check if the entry is a rename pending json file path.
|
|
|
|
+ * @param entry to be checked.
|
|
|
|
+ * @return true if it is a rename pending json file path.
|
|
|
|
+ */
|
|
|
|
+ private boolean isRenamePendingJsonPathEntry(BlobListResultEntrySchema entry) {
|
|
|
|
+ String path = entry.path() != null ? entry.path().toUri().getPath() : null;
|
|
|
|
+ return path != null
|
|
|
|
+ && !entry.path().isRoot()
|
|
|
|
+ && isAtomicRenameKey(path)
|
|
|
|
+ && !entry.isDirectory()
|
|
|
|
+ && path.endsWith(RenameAtomicity.SUFFIX);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* When listing is done on a file, Blob Endpoint returns the empty listing
|
|
* When listing is done on a file, Blob Endpoint returns the empty listing
|
|
* but DFS Endpoint returns the file status as one of the entries.
|
|
* but DFS Endpoint returns the file status as one of the entries.
|
|
* This is to convert file status into ListResultSchema.
|
|
* This is to convert file status into ListResultSchema.
|
|
- * @param relativePath
|
|
|
|
- * @param pathStatus
|
|
|
|
- * @return
|
|
|
|
|
|
+ * @param relativePath relative path of the file.
|
|
|
|
+ * @param pathStatus AbfsRestOperation containing the file status.
|
|
|
|
+ * @return BlobListResultSchema containing the file status.
|
|
*/
|
|
*/
|
|
private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePath, AbfsRestOperation pathStatus) {
|
|
private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePath, AbfsRestOperation pathStatus) {
|
|
BlobListResultSchema listResultSchema = new BlobListResultSchema();
|
|
BlobListResultSchema listResultSchema = new BlobListResultSchema();
|
|
@@ -2001,27 +2027,26 @@ public class AbfsBlobClient extends AbfsClient {
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
public boolean isNonEmptyDirectory(String path,
|
|
public boolean isNonEmptyDirectory(String path,
|
|
TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
- AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext,
|
|
|
|
- false);
|
|
|
|
- return !isEmptyListResults(listOp.getResult());
|
|
|
|
|
|
+ // This method is only called internally to determine state of a path
|
|
|
|
+ // and hence don't need identity transformation to happen.
|
|
|
|
+ ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null, false);
|
|
|
|
+ return !isEmptyListResults(listResponseData);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Check if the list call returned empty results without any continuation token.
|
|
* Check if the list call returned empty results without any continuation token.
|
|
- * @param result The response of listing API from the server.
|
|
|
|
|
|
+ * @param listResponseData The response of listing API from the server.
|
|
* @return True if empty results without continuation token.
|
|
* @return True if empty results without continuation token.
|
|
*/
|
|
*/
|
|
- private boolean isEmptyListResults(AbfsHttpOperation result) {
|
|
|
|
|
|
+ private boolean isEmptyListResults(ListResponseData listResponseData) {
|
|
|
|
+ AbfsHttpOperation result = listResponseData.getOp().getResult();
|
|
boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
|
|
boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
|
|
result.getListResultSchema() != null && // Parsing of list response was successful
|
|
result.getListResultSchema() != null && // Parsing of list response was successful
|
|
- result.getListResultSchema().paths().isEmpty() && // No paths were returned
|
|
|
|
- result.getListResultSchema() instanceof BlobListResultSchema && // It is safe to typecast to BlobListResultSchema
|
|
|
|
- ((BlobListResultSchema) result.getListResultSchema()).getNextMarker() == null; // No continuation token was returned
|
|
|
|
|
|
+ listResponseData.getFileStatusList().isEmpty() && listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were returned
|
|
|
|
+ StringUtils.isEmpty(listResponseData.getContinuationToken()); // No continuation token was returned
|
|
if (isEmptyList) {
|
|
if (isEmptyList) {
|
|
LOG.debug("List call returned empty results without any continuation token.");
|
|
LOG.debug("List call returned empty results without any continuation token.");
|
|
return true;
|
|
return true;
|
|
- } else if (result != null && !(result.getListResultSchema() instanceof BlobListResultSchema)) {
|
|
|
|
- throw new RuntimeException("List call returned unexpected results over Blob Endpoint.");
|
|
|
|
}
|
|
}
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|