|
@@ -0,0 +1,2222 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.fs.azure;
|
|
|
+
|
|
|
+import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
|
|
|
+
|
|
|
+import java.io.BufferedInputStream;
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.DataInputStream;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.OutputStream;
|
|
|
+import java.net.URI;
|
|
|
+import java.net.URISyntaxException;
|
|
|
+import java.security.InvalidKeyException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Calendar;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.EnumSet;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
|
|
|
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
|
|
|
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
|
+import org.mortbay.util.ajax.JSON;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.microsoft.windowsazure.storage.CloudStorageAccount;
|
|
|
+import com.microsoft.windowsazure.storage.OperationContext;
|
|
|
+import com.microsoft.windowsazure.storage.RetryExponentialRetry;
|
|
|
+import com.microsoft.windowsazure.storage.RetryNoRetry;
|
|
|
+import com.microsoft.windowsazure.storage.StorageCredentials;
|
|
|
+import com.microsoft.windowsazure.storage.StorageCredentialsAccountAndKey;
|
|
|
+import com.microsoft.windowsazure.storage.StorageCredentialsSharedAccessSignature;
|
|
|
+import com.microsoft.windowsazure.storage.StorageErrorCode;
|
|
|
+import com.microsoft.windowsazure.storage.StorageException;
|
|
|
+import com.microsoft.windowsazure.storage.blob.BlobListingDetails;
|
|
|
+import com.microsoft.windowsazure.storage.blob.BlobProperties;
|
|
|
+import com.microsoft.windowsazure.storage.blob.BlobRequestOptions;
|
|
|
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
|
|
|
+import com.microsoft.windowsazure.storage.blob.CopyStatus;
|
|
|
+import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
|
|
|
+import com.microsoft.windowsazure.storage.blob.ListBlobItem;
|
|
|
+import com.microsoft.windowsazure.storage.core.Utility;
|
|
|
+
|
|
|
+@InterfaceAudience.Private
|
|
|
+class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Configuration knob on whether we do block-level MD5 validation on
|
|
|
+ * upload/download.
|
|
|
+ */
|
|
|
+ static final String KEY_CHECK_BLOCK_MD5 = "fs.azure.check.block.md5";
|
|
|
+ /**
|
|
|
+ * Configuration knob on whether we store blob-level MD5 on upload.
|
|
|
+ */
|
|
|
+ static final String KEY_STORE_BLOB_MD5 = "fs.azure.store.blob.md5";
|
|
|
+ static final String DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME = "storageemulator";
|
|
|
+ static final String STORAGE_EMULATOR_ACCOUNT_NAME_PROPERTY_NAME = "fs.azure.storage.emulator.account.name";
|
|
|
+
|
|
|
+ public static final Log LOG = LogFactory
|
|
|
+ .getLog(AzureNativeFileSystemStore.class);
|
|
|
+
|
|
|
+ private StorageInterface storageInteractionLayer;
|
|
|
+ private CloudBlobDirectoryWrapper rootDirectory;
|
|
|
+ private CloudBlobContainerWrapper container;
|
|
|
+
|
|
|
+ // Constants local to this class.
|
|
|
+ //
|
|
|
+ private static final String KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
|
|
|
+ private static final String KEY_ACCOUNT_SAS_PREFIX = "fs.azure.sas.";
|
|
|
+
|
|
|
+ // note: this value is not present in core-default.xml as our real default is
|
|
|
+ // computed as min(2*cpu,8)
|
|
|
+ private static final String KEY_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
|
|
|
+
|
|
|
+ private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size";
|
|
|
+ private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout";
|
|
|
+ private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size";
|
|
|
+
|
|
|
+ // Property controlling whether to allow reads on blob which are concurrently
|
|
|
+ // appended out-of-band.
|
|
|
+ private static final String KEY_READ_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
|
|
|
+
|
|
|
+ // Configurable throttling parameter properties. These properties are located
|
|
|
+ // in the core-site.xml configuration file.
|
|
|
+ private static final String KEY_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
|
|
|
+ private static final String KEY_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
|
|
|
+ private static final String KEY_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
|
|
|
+ private static final String KEY_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
|
|
|
+
|
|
|
+ private static final String KEY_SELF_THROTTLE_ENABLE = "fs.azure.selfthrottling.enable";
|
|
|
+ private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor";
|
|
|
+ private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = "fs.azure.selfthrottling.write.factor";
|
|
|
+
|
|
|
+ private static final String PERMISSION_METADATA_KEY = "hdi_permission";
|
|
|
+ private static final String OLD_PERMISSION_METADATA_KEY = "asv_permission";
|
|
|
+ private static final String IS_FOLDER_METADATA_KEY = "hdi_isfolder";
|
|
|
+ private static final String OLD_IS_FOLDER_METADATA_KEY = "asv_isfolder";
|
|
|
+ static final String VERSION_METADATA_KEY = "hdi_version";
|
|
|
+ static final String OLD_VERSION_METADATA_KEY = "asv_version";
|
|
|
+ static final String FIRST_WASB_VERSION = "2013-01-01";
|
|
|
+ static final String CURRENT_WASB_VERSION = "2013-09-01";
|
|
|
+ static final String LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "hdi_tmpupload";
|
|
|
+ static final String OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "asv_tmpupload";
|
|
|
+
|
|
|
+ private static final String HTTP_SCHEME = "http";
|
|
|
+ private static final String HTTPS_SCHEME = "https";
|
|
|
+ private static final String WASB_AUTHORITY_DELIMITER = "@";
|
|
|
+ private static final String AZURE_ROOT_CONTAINER = "$root";
|
|
|
+
|
|
|
+ private static final int DEFAULT_CONCURRENT_WRITES = 8;
|
|
|
+
|
|
|
+ // Concurrent reads reads of data written out of band are disable by default.
|
|
|
+ private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
|
|
|
+
|
|
|
+ // Default block sizes
|
|
|
+ public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
|
|
|
+ public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
|
|
|
+
|
|
|
+ // Retry parameter defaults.
|
|
|
+ private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s
|
|
|
+ private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
|
|
|
+ private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
|
|
|
+ private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
|
|
|
+
|
|
|
+ // Self-throttling defaults. Allowed range = (0,1.0]
|
|
|
+ // Value of 1.0 means no self-throttling.
|
|
|
+ // Value of x means process data at factor x of unrestricted rate
|
|
|
+ private static final boolean DEFAULT_SELF_THROTTLE_ENABLE = true;
|
|
|
+ private static final float DEFAULT_SELF_THROTTLE_READ_FACTOR = 1.0f;
|
|
|
+ private static final float DEFAULT_SELF_THROTTLE_WRITE_FACTOR = 1.0f;
|
|
|
+
|
|
|
+ private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MEMBER VARIABLES
|
|
|
+ */
|
|
|
+
|
|
|
+ private URI sessionUri;
|
|
|
+ private Configuration sessionConfiguration;
|
|
|
+ private int concurrentWrites = DEFAULT_CONCURRENT_WRITES;
|
|
|
+ private boolean isAnonymousCredentials = false;
|
|
|
+ // Set to true if we are connecting using shared access signatures.
|
|
|
+ private boolean connectingUsingSAS = false;
|
|
|
+ private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
|
|
|
+
|
|
|
+ private boolean suppressRetryPolicy = false;
|
|
|
+ private boolean canCreateOrModifyContainer = false;
|
|
|
+ private ContainerState currentKnownContainerState = ContainerState.Unknown;
|
|
|
+ private final Object containerStateLock = new Object();
|
|
|
+
|
|
|
+ private boolean tolerateOobAppends = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND;
|
|
|
+
|
|
|
+ private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
|
|
|
+ private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
|
|
|
+
|
|
|
+ // Bandwidth throttling exponential back-off parameters
|
|
|
+ //
|
|
|
+ private int minBackoff; // the minimum back-off interval (ms) between retries.
|
|
|
+ private int maxBackoff; // the maximum back-off interval (ms) between retries.
|
|
|
+ private int deltaBackoff; // the back-off interval (ms) between retries.
|
|
|
+ private int maxRetries; // the maximum number of retry attempts.
|
|
|
+
|
|
|
+ // Self-throttling parameters
|
|
|
+ private boolean selfThrottlingEnabled;
|
|
|
+ private float selfThrottlingReadFactor;
|
|
|
+ private float selfThrottlingWriteFactor;
|
|
|
+
|
|
|
+ private TestHookOperationContext testHookOperationContext = null;
|
|
|
+
|
|
|
+ // Set if we're running against a storage emulator..
|
|
|
+ private boolean isStorageEmulator = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A test hook interface that can modify the operation context we use for
|
|
|
+ * Azure Storage operations, e.g. to inject errors.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ interface TestHookOperationContext {
|
|
|
+ OperationContext modifyOperationContext(OperationContext original);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Suppress the default retry policy for the Storage, useful in unit tests to
|
|
|
+ * test negative cases without waiting forever.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ void suppressRetryPolicy() {
|
|
|
+ suppressRetryPolicy = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a test hook to modify the operation context we use for Azure Storage
|
|
|
+ * operations.
|
|
|
+ *
|
|
|
+ * @param testHook
|
|
|
+ * The test hook, or null to unset previous hooks.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ void addTestHookToOperationContext(TestHookOperationContext testHook) {
|
|
|
+ this.testHookOperationContext = testHook;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If we're asked by unit tests to not retry, set the retry policy factory in
|
|
|
+ * the client accordingly.
|
|
|
+ */
|
|
|
+ private void suppressRetryPolicyInClientIfNeeded() {
|
|
|
+ if (suppressRetryPolicy) {
|
|
|
+ storageInteractionLayer.setRetryPolicyFactory(new RetryNoRetry());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a JSON serializer that can serialize a PermissionStatus object into
|
|
|
+ * the JSON string we want in the blob metadata.
|
|
|
+ *
|
|
|
+ * @return The JSON serializer.
|
|
|
+ */
|
|
|
+ private static JSON createPermissionJsonSerializer() {
|
|
|
+ JSON serializer = new JSON();
|
|
|
+ serializer.addConvertor(PermissionStatus.class,
|
|
|
+ new PermissionStatusJsonSerializer());
|
|
|
+ return serializer;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A converter for PermissionStatus to/from JSON as we want it in the blob
|
|
|
+ * metadata.
|
|
|
+ */
|
|
|
+ private static class PermissionStatusJsonSerializer implements JSON.Convertor {
|
|
|
+ private static final String OWNER_TAG = "owner";
|
|
|
+ private static final String GROUP_TAG = "group";
|
|
|
+ private static final String PERMISSIONS_TAG = "permissions";
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void toJSON(Object obj, JSON.Output out) {
|
|
|
+ PermissionStatus permissionStatus = (PermissionStatus) obj;
|
|
|
+ // Don't store group as null, just store it as empty string
|
|
|
+ // (which is FileStatus behavior).
|
|
|
+ String group = permissionStatus.getGroupName() == null ? ""
|
|
|
+ : permissionStatus.getGroupName();
|
|
|
+ out.add(OWNER_TAG, permissionStatus.getUserName());
|
|
|
+ out.add(GROUP_TAG, group);
|
|
|
+ out.add(PERMISSIONS_TAG, permissionStatus.getPermission().toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object fromJSON(@SuppressWarnings("rawtypes") Map object) {
|
|
|
+ return PermissionStatusJsonSerializer.fromJSONMap(object);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
+ public static PermissionStatus fromJSONString(String jsonString) {
|
|
|
+ // The JSON class can only find out about an object's class (and call me)
|
|
|
+ // if we store the class name in the JSON string. Since I don't want to
|
|
|
+ // do that (it's an implementation detail), I just deserialize as a
|
|
|
+ // the default Map (JSON's default behavior) and parse that.
|
|
|
+ return fromJSONMap((Map) PERMISSION_JSON_SERIALIZER.fromJSON(jsonString));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static PermissionStatus fromJSONMap(
|
|
|
+ @SuppressWarnings("rawtypes") Map object) {
|
|
|
+ return new PermissionStatus((String) object.get(OWNER_TAG),
|
|
|
+ (String) object.get(GROUP_TAG),
|
|
|
+ // The initial - below is the Unix file type,
|
|
|
+ // which FsPermission needs there but ignores.
|
|
|
+ FsPermission.valueOf("-" + (String) object.get(PERMISSIONS_TAG)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ void setAzureStorageInteractionLayer(StorageInterface storageInteractionLayer) {
|
|
|
+ this.storageInteractionLayer = storageInteractionLayer;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if concurrent reads and writes on the same blob are allowed.
|
|
|
+ *
|
|
|
+ * @return true if concurrent reads and OOB writes has been configured, false
|
|
|
+ * otherwise.
|
|
|
+ */
|
|
|
+ private boolean isConcurrentOOBAppendAllowed() {
|
|
|
+ return tolerateOobAppends;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Method for the URI and configuration object necessary to create a storage
|
|
|
+ * session with an Azure session. It parses the scheme to ensure it matches
|
|
|
+ * the storage protocol supported by this file system.
|
|
|
+ *
|
|
|
+ * @param uri
|
|
|
+ * - URI for target storage blob.
|
|
|
+ * @param conf
|
|
|
+ * - reference to configuration object.
|
|
|
+ *
|
|
|
+ * @throws IllegalArgumentException
|
|
|
+ * if URI or job object is null, or invalid scheme.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void initialize(URI uri, Configuration conf) throws AzureException {
|
|
|
+
|
|
|
+ if (null == this.storageInteractionLayer) {
|
|
|
+ this.storageInteractionLayer = new StorageInterfaceImpl();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check that URI exists.
|
|
|
+ //
|
|
|
+ if (null == uri) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Cannot initialize WASB file system, URI is null");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check that configuration object is non-null.
|
|
|
+ //
|
|
|
+ if (null == conf) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Cannot initialize WASB file system, URI is null");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Incoming parameters validated. Capture the URI and the job configuration
|
|
|
+ // object.
|
|
|
+ //
|
|
|
+ sessionUri = uri;
|
|
|
+ sessionConfiguration = conf;
|
|
|
+
|
|
|
+ // Start an Azure storage session.
|
|
|
+ //
|
|
|
+ createAzureStorageSession();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Method to extract the account name from an Azure URI.
|
|
|
+ *
|
|
|
+ * @param uri
|
|
|
+ * -- WASB blob URI
|
|
|
+ * @returns accountName -- the account name for the URI.
|
|
|
+ * @throws URISyntaxException
|
|
|
+ * if the URI does not have an authority it is badly formed.
|
|
|
+ */
|
|
|
+ private String getAccountFromAuthority(URI uri) throws URISyntaxException {
|
|
|
+
|
|
|
+ // Check to make sure that the authority is valid for the URI.
|
|
|
+ //
|
|
|
+ String authority = uri.getRawAuthority();
|
|
|
+ if (null == authority) {
|
|
|
+ // Badly formed or illegal URI.
|
|
|
+ //
|
|
|
+ throw new URISyntaxException(uri.toString(),
|
|
|
+ "Expected URI with a valid authority");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if authority container the delimiter separating the account name
|
|
|
+ // from the
|
|
|
+ // the container.
|
|
|
+ //
|
|
|
+ if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
|
|
|
+ return authority;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Split off the container name and the authority.
|
|
|
+ //
|
|
|
+ String[] authorityParts = authority.split(WASB_AUTHORITY_DELIMITER, 2);
|
|
|
+
|
|
|
+ // Because the string contains an '@' delimiter, a container must be
|
|
|
+ // specified.
|
|
|
+ //
|
|
|
+ if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
|
|
|
+ // Badly formed WASB authority since there is no container.
|
|
|
+ //
|
|
|
+ final String errMsg = String
|
|
|
+ .format(
|
|
|
+ "URI '%s' has a malformed WASB authority, expected container name. "
|
|
|
+ + "Authority takes the form wasb://[<container name>@]<account name>",
|
|
|
+ uri.toString());
|
|
|
+ throw new IllegalArgumentException(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Return with the account name. It is possible that this name is NULL.
|
|
|
+ //
|
|
|
+ return authorityParts[1];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Method to extract the container name from an Azure URI.
|
|
|
+ *
|
|
|
+ * @param uri
|
|
|
+ * -- WASB blob URI
|
|
|
+ * @returns containerName -- the container name for the URI. May be null.
|
|
|
+ * @throws URISyntaxException
|
|
|
+ * if the uri does not have an authority it is badly formed.
|
|
|
+ */
|
|
|
+ private String getContainerFromAuthority(URI uri) throws URISyntaxException {
|
|
|
+
|
|
|
+ // Check to make sure that the authority is valid for the URI.
|
|
|
+ //
|
|
|
+ String authority = uri.getRawAuthority();
|
|
|
+ if (null == authority) {
|
|
|
+ // Badly formed or illegal URI.
|
|
|
+ //
|
|
|
+ throw new URISyntaxException(uri.toString(),
|
|
|
+ "Expected URI with a valid authority");
|
|
|
+ }
|
|
|
+
|
|
|
+ // The URI has a valid authority. Extract the container name. It is the
|
|
|
+ // second component of the WASB URI authority.
|
|
|
+ if (!authority.contains(WASB_AUTHORITY_DELIMITER)) {
|
|
|
+ // The authority does not have a container name. Use the default container
|
|
|
+ // by
|
|
|
+ // setting the container name to the default Azure root container.
|
|
|
+ //
|
|
|
+ return AZURE_ROOT_CONTAINER;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Split off the container name and the authority.
|
|
|
+ String[] authorityParts = authority.split(WASB_AUTHORITY_DELIMITER, 2);
|
|
|
+
|
|
|
+ // Because the string contains an '@' delimiter, a container must be
|
|
|
+ // specified.
|
|
|
+ if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
|
|
|
+ // Badly formed WASB authority since there is no container.
|
|
|
+ final String errMsg = String
|
|
|
+ .format(
|
|
|
+ "URI '%s' has a malformed WASB authority, expected container name."
|
|
|
+ + "Authority takes the form wasb://[<container name>@]<account name>",
|
|
|
+ uri.toString());
|
|
|
+ throw new IllegalArgumentException(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set the container name from the first entry for the split parts of the
|
|
|
+ // authority.
|
|
|
+ return authorityParts[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the appropriate return the appropriate scheme for communicating with
|
|
|
+ * Azure depending on whether wasb or wasbs is specified in the target URI.
|
|
|
+ *
|
|
|
+ * return scheme - HTTPS or HTTP as appropriate.
|
|
|
+ */
|
|
|
+ private String getHTTPScheme() {
|
|
|
+ String sessionScheme = sessionUri.getScheme();
|
|
|
+ // Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme.
|
|
|
+ if (sessionScheme != null
|
|
|
+ && (sessionScheme.equalsIgnoreCase("asvs") || sessionScheme
|
|
|
+ .equalsIgnoreCase("wasbs"))) {
|
|
|
+ return HTTPS_SCHEME;
|
|
|
+ } else {
|
|
|
+ // At this point the scheme should be either null or asv or wasb.
|
|
|
+ // Intentionally I'm not going to validate it though since I don't feel
|
|
|
+ // it's this method's job to ensure a valid URI scheme for this file
|
|
|
+ // system.
|
|
|
+ return HTTP_SCHEME;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set the configuration parameters for this client storage session with
|
|
|
+ * Azure.
|
|
|
+ *
|
|
|
+ * @throws AzureException
|
|
|
+ * @throws ConfigurationException
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private void configureAzureStorageSession() throws AzureException {
|
|
|
+
|
|
|
+ // Assertion: Target session URI already should have been captured.
|
|
|
+ if (sessionUri == null) {
|
|
|
+ throw new AssertionError(
|
|
|
+ "Expected a non-null session URI when configuring storage session");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Assertion: A client session already should have been established with
|
|
|
+ // Azure.
|
|
|
+ if (storageInteractionLayer == null) {
|
|
|
+ throw new AssertionError(String.format(
|
|
|
+ "Cannot configure storage session for URI '%s' "
|
|
|
+ + "if storage session has not been established.",
|
|
|
+ sessionUri.toString()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Determine whether or not reads are allowed concurrent with OOB writes.
|
|
|
+ tolerateOobAppends = sessionConfiguration.getBoolean(
|
|
|
+ KEY_READ_TOLERATE_CONCURRENT_APPEND,
|
|
|
+ DEFAULT_READ_TOLERATE_CONCURRENT_APPEND);
|
|
|
+
|
|
|
+ // Retrieve configuration for the minimum stream read and write block size.
|
|
|
+ //
|
|
|
+ this.downloadBlockSizeBytes = sessionConfiguration.getInt(
|
|
|
+ KEY_STREAM_MIN_READ_SIZE, DEFAULT_DOWNLOAD_BLOCK_SIZE);
|
|
|
+ this.uploadBlockSizeBytes = sessionConfiguration.getInt(
|
|
|
+ KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
|
|
|
+
|
|
|
+ // The job may want to specify a timeout to use when engaging the
|
|
|
+ // storage service. The default is currently 90 seconds. It may
|
|
|
+ // be necessary to increase this value for long latencies in larger
|
|
|
+ // jobs. If the timeout specified is greater than zero seconds use
|
|
|
+ // it, otherwise use the default service client timeout.
|
|
|
+ int storageConnectionTimeout = sessionConfiguration.getInt(
|
|
|
+ KEY_STORAGE_CONNECTION_TIMEOUT, 0);
|
|
|
+
|
|
|
+ if (0 < storageConnectionTimeout) {
|
|
|
+ storageInteractionLayer.setTimeoutInMs(storageConnectionTimeout * 1000);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set the concurrency values equal to the that specified in the
|
|
|
+ // configuration file. If it does not exist, set it to the default
|
|
|
+ // value calculated as double the number of CPU cores on the client
|
|
|
+ // machine. The concurrency value is minimum of double the cores and
|
|
|
+ // the read/write property.
|
|
|
+ int cpuCores = 2 * Runtime.getRuntime().availableProcessors();
|
|
|
+
|
|
|
+ concurrentWrites = sessionConfiguration.getInt(
|
|
|
+ KEY_CONCURRENT_CONNECTION_VALUE_OUT,
|
|
|
+ Math.min(cpuCores, DEFAULT_CONCURRENT_WRITES));
|
|
|
+
|
|
|
+ // Set up the exponential retry policy.
|
|
|
+ minBackoff = sessionConfiguration.getInt(KEY_MIN_BACKOFF_INTERVAL,
|
|
|
+ DEFAULT_MIN_BACKOFF_INTERVAL);
|
|
|
+
|
|
|
+ maxBackoff = sessionConfiguration.getInt(KEY_MAX_BACKOFF_INTERVAL,
|
|
|
+ DEFAULT_MAX_BACKOFF_INTERVAL);
|
|
|
+
|
|
|
+ deltaBackoff = sessionConfiguration.getInt(KEY_BACKOFF_INTERVAL,
|
|
|
+ DEFAULT_BACKOFF_INTERVAL);
|
|
|
+
|
|
|
+ maxRetries = sessionConfiguration.getInt(KEY_MAX_IO_RETRIES,
|
|
|
+ DEFAULT_MAX_RETRY_ATTEMPTS);
|
|
|
+
|
|
|
+ storageInteractionLayer.setRetryPolicyFactory(new RetryExponentialRetry(
|
|
|
+ minBackoff, deltaBackoff, maxBackoff, maxRetries));
|
|
|
+
|
|
|
+ // read the self-throttling config.
|
|
|
+ selfThrottlingEnabled = sessionConfiguration.getBoolean(
|
|
|
+ KEY_SELF_THROTTLE_ENABLE, DEFAULT_SELF_THROTTLE_ENABLE);
|
|
|
+
|
|
|
+ selfThrottlingReadFactor = sessionConfiguration.getFloat(
|
|
|
+ KEY_SELF_THROTTLE_READ_FACTOR, DEFAULT_SELF_THROTTLE_READ_FACTOR);
|
|
|
+
|
|
|
+ selfThrottlingWriteFactor = sessionConfiguration.getFloat(
|
|
|
+ KEY_SELF_THROTTLE_WRITE_FACTOR, DEFAULT_SELF_THROTTLE_WRITE_FACTOR);
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(String
|
|
|
+ .format(
|
|
|
+ "AzureNativeFileSystemStore init. Settings=%d,%b,%d,{%d,%d,%d,%d},{%b,%f,%f}",
|
|
|
+ concurrentWrites, tolerateOobAppends,
|
|
|
+ ((storageConnectionTimeout > 0) ? storageConnectionTimeout
|
|
|
+ : STORAGE_CONNECTION_TIMEOUT_DEFAULT), minBackoff,
|
|
|
+ deltaBackoff, maxBackoff, maxRetries, selfThrottlingEnabled,
|
|
|
+ selfThrottlingReadFactor, selfThrottlingWriteFactor));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Connect to Azure storage using anonymous credentials.
|
|
|
+ *
|
|
|
+ * @param uri
|
|
|
+ * - URI to target blob (R/O access to public blob)
|
|
|
+ *
|
|
|
+ * @throws StorageException
|
|
|
+ * raised on errors communicating with Azure storage.
|
|
|
+ * @throws IOException
|
|
|
+ * raised on errors performing I/O or setting up the session.
|
|
|
+ * @throws URISyntaxExceptions
|
|
|
+ * raised on creating mal-formed URI's.
|
|
|
+ */
|
|
|
+ private void connectUsingAnonymousCredentials(final URI uri)
|
|
|
+ throws StorageException, IOException, URISyntaxException {
|
|
|
+ // Use an HTTP scheme since the URI specifies a publicly accessible
|
|
|
+ // container. Explicitly create a storage URI corresponding to the URI
|
|
|
+ // parameter for use in creating the service client.
|
|
|
+ String accountName = getAccountFromAuthority(uri);
|
|
|
+ URI storageUri = new URI(getHTTPScheme() + ":" + PATH_DELIMITER
|
|
|
+ + PATH_DELIMITER + accountName);
|
|
|
+
|
|
|
+ // Create the service client with anonymous credentials.
|
|
|
+ String containerName = getContainerFromAuthority(uri);
|
|
|
+ storageInteractionLayer.createBlobClient(storageUri);
|
|
|
+ suppressRetryPolicyInClientIfNeeded();
|
|
|
+
|
|
|
+ // Capture the container reference.
|
|
|
+ container = storageInteractionLayer.getContainerReference(containerName);
|
|
|
+ rootDirectory = container.getDirectoryReference("");
|
|
|
+
|
|
|
+ // Check for container existence, and our ability to access it.
|
|
|
+ try {
|
|
|
+ if (!container.exists(getInstrumentedContext())) {
|
|
|
+ throw new AzureException("Container " + containerName + " in account "
|
|
|
+ + accountName + " not found, and we can't create "
|
|
|
+ + " it using anoynomous credentials.");
|
|
|
+ }
|
|
|
+ } catch (StorageException ex) {
|
|
|
+ throw new AzureException("Unable to access container " + containerName
|
|
|
+ + " in account " + accountName
|
|
|
+ + " using anonymous credentials, and no credentials found for them "
|
|
|
+ + " in the configuration.", ex);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Accessing the storage server unauthenticated using
|
|
|
+ // anonymous credentials.
|
|
|
+ isAnonymousCredentials = true;
|
|
|
+
|
|
|
+ // Configure Azure storage session.
|
|
|
+ configureAzureStorageSession();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void connectUsingCredentials(String accountName,
|
|
|
+ StorageCredentials credentials, String containerName)
|
|
|
+ throws URISyntaxException, StorageException, AzureException {
|
|
|
+
|
|
|
+ if (isStorageEmulatorAccount(accountName)) {
|
|
|
+ isStorageEmulator = true;
|
|
|
+ CloudStorageAccount account = CloudStorageAccount
|
|
|
+ .getDevelopmentStorageAccount();
|
|
|
+ storageInteractionLayer.createBlobClient(account);
|
|
|
+ } else {
|
|
|
+ URI blobEndPoint = new URI(getHTTPScheme() + "://" + accountName);
|
|
|
+ storageInteractionLayer.createBlobClient(blobEndPoint, credentials);
|
|
|
+ }
|
|
|
+ suppressRetryPolicyInClientIfNeeded();
|
|
|
+
|
|
|
+ // Capture the container reference for debugging purposes.
|
|
|
+ container = storageInteractionLayer.getContainerReference(containerName);
|
|
|
+ rootDirectory = container.getDirectoryReference("");
|
|
|
+
|
|
|
+ // Can only create container if using account key credentials
|
|
|
+ canCreateOrModifyContainer = credentials instanceof StorageCredentialsAccountAndKey;
|
|
|
+
|
|
|
+ // Configure Azure storage session.
|
|
|
+ configureAzureStorageSession();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Connect to Azure storage using account key credentials.
|
|
|
+ */
|
|
|
+ private void connectUsingConnectionStringCredentials(
|
|
|
+ final String accountName, final String containerName,
|
|
|
+ final String accountKey) throws InvalidKeyException, StorageException,
|
|
|
+ IOException, URISyntaxException {
|
|
|
+ // If the account name is "acc.blob.core.windows.net", then the
|
|
|
+ // rawAccountName is just "acc"
|
|
|
+ String rawAccountName = accountName.split("\\.")[0];
|
|
|
+ StorageCredentials credentials = new StorageCredentialsAccountAndKey(
|
|
|
+ rawAccountName, accountKey);
|
|
|
+ connectUsingCredentials(accountName, credentials, containerName);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Connect to Azure storage using shared access signature credentials.
|
|
|
+ */
|
|
|
+ private void connectUsingSASCredentials(final String accountName,
|
|
|
+ final String containerName, final String sas) throws InvalidKeyException,
|
|
|
+ StorageException, IOException, URISyntaxException {
|
|
|
+ StorageCredentials credentials = new StorageCredentialsSharedAccessSignature(
|
|
|
+ sas);
|
|
|
+ connectingUsingSAS = true;
|
|
|
+ connectUsingCredentials(accountName, credentials, containerName);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isStorageEmulatorAccount(final String accountName) {
|
|
|
+ return accountName.equalsIgnoreCase(sessionConfiguration.get(
|
|
|
+ STORAGE_EMULATOR_ACCOUNT_NAME_PROPERTY_NAME,
|
|
|
+ DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME));
|
|
|
+ }
|
|
|
+
|
|
|
+ static String getAccountKeyFromConfiguration(String accountName,
|
|
|
+ Configuration conf) throws KeyProviderException {
|
|
|
+ String key = null;
|
|
|
+ String keyProviderClass = conf.get(KEY_ACCOUNT_KEYPROVIDER_PREFIX
|
|
|
+ + accountName);
|
|
|
+ KeyProvider keyProvider = null;
|
|
|
+
|
|
|
+ if (keyProviderClass == null) {
|
|
|
+ // No key provider was provided so use the provided key as is.
|
|
|
+ keyProvider = new SimpleKeyProvider();
|
|
|
+ } else {
|
|
|
+ // create an instance of the key provider class and verify it
|
|
|
+ // implements KeyProvider
|
|
|
+ Object keyProviderObject = null;
|
|
|
+ try {
|
|
|
+ Class<?> clazz = conf.getClassByName(keyProviderClass);
|
|
|
+ keyProviderObject = clazz.newInstance();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new KeyProviderException("Unable to load key provider class.", e);
|
|
|
+ }
|
|
|
+ if (!(keyProviderObject instanceof KeyProvider)) {
|
|
|
+ throw new KeyProviderException(keyProviderClass
|
|
|
+ + " specified in config is not a valid KeyProvider class.");
|
|
|
+ }
|
|
|
+ keyProvider = (KeyProvider) keyProviderObject;
|
|
|
+ }
|
|
|
+ key = keyProvider.getStorageAccountKey(accountName, conf);
|
|
|
+
|
|
|
+ return key;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Establish a session with Azure blob storage based on the target URI. The
|
|
|
+ * method determines whether or not the URI target contains an explicit
|
|
|
+ * account or an implicit default cluster-wide account.
|
|
|
+ *
|
|
|
+ * @throws AzureException
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private void createAzureStorageSession() throws AzureException {
|
|
|
+
|
|
|
+ // Make sure this object was properly initialized with references to
|
|
|
+ // the sessionUri and sessionConfiguration.
|
|
|
+ if (null == sessionUri || null == sessionConfiguration) {
|
|
|
+ throw new AzureException("Filesystem object not initialized properly."
|
|
|
+ + "Unable to start session with Azure Storage server.");
|
|
|
+ }
|
|
|
+
|
|
|
+ // File system object initialized, attempt to establish a session
|
|
|
+ // with the Azure storage service for the target URI string.
|
|
|
+ try {
|
|
|
+ // Inspect the URI authority to determine the account and use the account
|
|
|
+ // to start an Azure blob client session using an account key for the
|
|
|
+ // the account or anonymously.
|
|
|
+ // For all URI's do the following checks in order:
|
|
|
+ // 1. Validate that <account> can be used with the current Hadoop
|
|
|
+ // cluster by checking it exists in the list of configured accounts
|
|
|
+ // for the cluster.
|
|
|
+ // 2. Look up the AccountKey in the list of configured accounts for the
|
|
|
+ // cluster.
|
|
|
+ // 3. If there is no AccountKey, assume anonymous public blob access
|
|
|
+ // when accessing the blob.
|
|
|
+ //
|
|
|
+ // If the URI does not specify a container use the default root container
|
|
|
+ // under the account name.
|
|
|
+
|
|
|
+ // Assertion: Container name on the session Uri should be non-null.
|
|
|
+ if (getContainerFromAuthority(sessionUri) == null) {
|
|
|
+ throw new AssertionError(String.format(
|
|
|
+ "Non-null container expected from session URI: %s.",
|
|
|
+ sessionUri.toString()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the account name.
|
|
|
+ String accountName = getAccountFromAuthority(sessionUri);
|
|
|
+ if (null == accountName) {
|
|
|
+ // Account name is not specified as part of the URI. Throw indicating
|
|
|
+ // an invalid account name.
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Cannot load WASB file system account name not"
|
|
|
+ + " specified in URI: %s.", sessionUri.toString());
|
|
|
+ throw new AzureException(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ String containerName = getContainerFromAuthority(sessionUri);
|
|
|
+
|
|
|
+ // Check whether this is a storage emulator account.
|
|
|
+ if (isStorageEmulatorAccount(accountName)) {
|
|
|
+ // It is an emulator account, connect to it with no credentials.
|
|
|
+ connectUsingCredentials(accountName, null, containerName);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check whether we have a shared access signature for that container.
|
|
|
+ String propertyValue = sessionConfiguration.get(KEY_ACCOUNT_SAS_PREFIX
|
|
|
+ + containerName + "." + accountName);
|
|
|
+ if (propertyValue != null) {
|
|
|
+ // SAS was found. Connect using that.
|
|
|
+ connectUsingSASCredentials(accountName, containerName, propertyValue);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check whether the account is configured with an account key.
|
|
|
+ propertyValue = getAccountKeyFromConfiguration(accountName,
|
|
|
+ sessionConfiguration);
|
|
|
+ if (propertyValue != null) {
|
|
|
+
|
|
|
+ // Account key was found.
|
|
|
+ // Create the Azure storage session using the account key and container.
|
|
|
+ connectUsingConnectionStringCredentials(
|
|
|
+ getAccountFromAuthority(sessionUri),
|
|
|
+ getContainerFromAuthority(sessionUri), propertyValue);
|
|
|
+
|
|
|
+ // Return to caller
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // The account access is not configured for this cluster. Try anonymous
|
|
|
+ // access.
|
|
|
+ connectUsingAnonymousCredentials(sessionUri);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Caught exception while attempting to initialize the Azure File
|
|
|
+ // System store, re-throw the exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private enum ContainerState {
|
|
|
+ /**
|
|
|
+ * We haven't checked the container state yet.
|
|
|
+ */
|
|
|
+ Unknown,
|
|
|
+ /**
|
|
|
+ * We checked and the container doesn't exist.
|
|
|
+ */
|
|
|
+ DoesntExist,
|
|
|
+ /**
|
|
|
+ * The container exists and doesn't have an WASB version stamp on it.
|
|
|
+ */
|
|
|
+ ExistsNoVersion,
|
|
|
+ /**
|
|
|
+ * The container exists and has an unsupported WASB version stamped on it.
|
|
|
+ */
|
|
|
+ ExistsAtWrongVersion,
|
|
|
+ /**
|
|
|
+ * The container exists and has the proper WASB version stamped on it.
|
|
|
+ */
|
|
|
+ ExistsAtRightVersion
|
|
|
+ }
|
|
|
+
|
|
|
+ private enum ContainerAccessType {
|
|
|
+ /**
|
|
|
+ * We're accessing the container for a pure read operation, e.g. read a
|
|
|
+ * file.
|
|
|
+ */
|
|
|
+ PureRead,
|
|
|
+ /**
|
|
|
+ * We're accessing the container purely to write something, e.g. write a
|
|
|
+ * file.
|
|
|
+ */
|
|
|
+ PureWrite,
|
|
|
+ /**
|
|
|
+ * We're accessing the container to read something then write, e.g. rename a
|
|
|
+ * file.
|
|
|
+ */
|
|
|
+ ReadThenWrite
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This should be called from any method that does any modifications to the
|
|
|
+ * underlying container: it makes sure to put the WASB current version in the
|
|
|
+ * container's metadata if it's not already there.
|
|
|
+ */
|
|
|
+ private ContainerState checkContainer(ContainerAccessType accessType)
|
|
|
+ throws StorageException, AzureException {
|
|
|
+ synchronized (containerStateLock) {
|
|
|
+ if (isOkContainerState(accessType)) {
|
|
|
+ return currentKnownContainerState;
|
|
|
+ }
|
|
|
+ if (currentKnownContainerState == ContainerState.ExistsAtWrongVersion) {
|
|
|
+ String containerVersion = retrieveVersionAttribute(container);
|
|
|
+ throw wrongVersionException(containerVersion);
|
|
|
+ }
|
|
|
+ // This means I didn't check it before or it didn't exist or
|
|
|
+ // we need to stamp the version. Since things may have changed by
|
|
|
+ // other machines since then, do the check again and don't depend
|
|
|
+ // on past information.
|
|
|
+
|
|
|
+ // Sanity check: we don't expect this at this point.
|
|
|
+ if (currentKnownContainerState == ContainerState.ExistsAtRightVersion) {
|
|
|
+ throw new AssertionError("Unexpected state: "
|
|
|
+ + currentKnownContainerState);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Download the attributes - doubles as an existence check with just
|
|
|
+ // one service call
|
|
|
+ try {
|
|
|
+ container.downloadAttributes(getInstrumentedContext());
|
|
|
+ currentKnownContainerState = ContainerState.Unknown;
|
|
|
+ } catch (StorageException ex) {
|
|
|
+ if (ex.getErrorCode().equals(
|
|
|
+ StorageErrorCode.RESOURCE_NOT_FOUND.toString())) {
|
|
|
+ currentKnownContainerState = ContainerState.DoesntExist;
|
|
|
+ } else {
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (currentKnownContainerState == ContainerState.DoesntExist) {
|
|
|
+ // If the container doesn't exist and we intend to write to it,
|
|
|
+ // create it now.
|
|
|
+ if (needToCreateContainer(accessType)) {
|
|
|
+ storeVersionAttribute(container);
|
|
|
+ container.create(getInstrumentedContext());
|
|
|
+ currentKnownContainerState = ContainerState.ExistsAtRightVersion;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // The container exists, check the version.
|
|
|
+ String containerVersion = retrieveVersionAttribute(container);
|
|
|
+ if (containerVersion != null) {
|
|
|
+ if (containerVersion.equals(FIRST_WASB_VERSION)) {
|
|
|
+ // It's the version from when WASB was called ASV, just
|
|
|
+ // fix the version attribute if needed and proceed.
|
|
|
+ // We should be good otherwise.
|
|
|
+ if (needToStampVersion(accessType)) {
|
|
|
+ storeVersionAttribute(container);
|
|
|
+ container.uploadMetadata(getInstrumentedContext());
|
|
|
+ }
|
|
|
+ } else if (!containerVersion.equals(CURRENT_WASB_VERSION)) {
|
|
|
+ // Don't know this version - throw.
|
|
|
+ currentKnownContainerState = ContainerState.ExistsAtWrongVersion;
|
|
|
+ throw wrongVersionException(containerVersion);
|
|
|
+ } else {
|
|
|
+ // It's our correct version.
|
|
|
+ currentKnownContainerState = ContainerState.ExistsAtRightVersion;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // No version info exists.
|
|
|
+ currentKnownContainerState = ContainerState.ExistsNoVersion;
|
|
|
+ if (needToStampVersion(accessType)) {
|
|
|
+ // Need to stamp the version
|
|
|
+ storeVersionAttribute(container);
|
|
|
+ container.uploadMetadata(getInstrumentedContext());
|
|
|
+ currentKnownContainerState = ContainerState.ExistsAtRightVersion;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return currentKnownContainerState;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private AzureException wrongVersionException(String containerVersion) {
|
|
|
+ return new AzureException("The container " + container.getName()
|
|
|
+ + " is at an unsupported version: " + containerVersion
|
|
|
+ + ". Current supported version: " + FIRST_WASB_VERSION);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean needToStampVersion(ContainerAccessType accessType) {
|
|
|
+ // We need to stamp the version on the container any time we write to
|
|
|
+ // it and we have the correct credentials to be able to write container
|
|
|
+ // metadata.
|
|
|
+ return accessType != ContainerAccessType.PureRead
|
|
|
+ && canCreateOrModifyContainer;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static boolean needToCreateContainer(ContainerAccessType accessType) {
|
|
|
+ // We need to pro-actively create the container (if it doesn't exist) if
|
|
|
+ // we're doing a pure write. No need to create it for pure read or read-
|
|
|
+ // then-write access.
|
|
|
+ return accessType == ContainerAccessType.PureWrite;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Determines whether we have to pull the container information again
|
|
|
+ // or we can work based off what we already have.
|
|
|
+ private boolean isOkContainerState(ContainerAccessType accessType) {
|
|
|
+ switch (currentKnownContainerState) {
|
|
|
+ case Unknown:
|
|
|
+ // When using SAS, we can't discover container attributes
|
|
|
+ // so just live with Unknown state and fail later if it
|
|
|
+ // doesn't exist.
|
|
|
+ return connectingUsingSAS;
|
|
|
+ case DoesntExist:
|
|
|
+ return false; // the container could have been created
|
|
|
+ case ExistsAtRightVersion:
|
|
|
+ return true; // fine to optimize
|
|
|
+ case ExistsAtWrongVersion:
|
|
|
+ return false;
|
|
|
+ case ExistsNoVersion:
|
|
|
+ // If there's no version, it's OK if we don't need to stamp the version
|
|
|
+ // or we can't anyway even if we wanted to.
|
|
|
+ return !needToStampVersion(accessType);
|
|
|
+ default:
|
|
|
+ throw new AssertionError("Unknown access type: " + accessType);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean getUseTransactionalContentMD5() {
|
|
|
+ return sessionConfiguration.getBoolean(KEY_CHECK_BLOCK_MD5, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private BlobRequestOptions getUploadOptions() {
|
|
|
+ BlobRequestOptions options = new BlobRequestOptions();
|
|
|
+ options.setStoreBlobContentMD5(sessionConfiguration.getBoolean(
|
|
|
+ KEY_STORE_BLOB_MD5, false));
|
|
|
+ options.setUseTransactionalContentMD5(getUseTransactionalContentMD5());
|
|
|
+ options.setConcurrentRequestCount(concurrentWrites);
|
|
|
+
|
|
|
+ options.setRetryPolicyFactory(new RetryExponentialRetry(minBackoff,
|
|
|
+ deltaBackoff, maxBackoff, maxRetries));
|
|
|
+
|
|
|
+ return options;
|
|
|
+ }
|
|
|
+
|
|
|
+ private BlobRequestOptions getDownloadOptions() {
|
|
|
+ BlobRequestOptions options = new BlobRequestOptions();
|
|
|
+ options.setRetryPolicyFactory(new RetryExponentialRetry(minBackoff,
|
|
|
+ deltaBackoff, maxBackoff, maxRetries));
|
|
|
+ options.setUseTransactionalContentMD5(getUseTransactionalContentMD5());
|
|
|
+ return options;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DataOutputStream storefile(String key,
|
|
|
+ PermissionStatus permissionStatus) throws AzureException {
|
|
|
+ try {
|
|
|
+
|
|
|
+ // Check if a session exists, if not create a session with the
|
|
|
+ // Azure storage server.
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AzureException(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if there is an authenticated account associated with the
|
|
|
+ // file this instance of the WASB file system. If not the file system
|
|
|
+ // has not been authenticated and all access is anonymous.
|
|
|
+ if (!isAuthenticatedAccess()) {
|
|
|
+ // Preemptively raise an exception indicating no uploads are
|
|
|
+ // allowed to anonymous accounts.
|
|
|
+ throw new AzureException(new IOException(
|
|
|
+ "Uploads to public accounts using anonymous "
|
|
|
+ + "access is prohibited."));
|
|
|
+ }
|
|
|
+
|
|
|
+ checkContainer(ContainerAccessType.PureWrite);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Note: Windows Azure Blob Storage does not allow the creation of
|
|
|
+ * arbitrary directory paths under the default $root directory. This is by
|
|
|
+ * design to eliminate ambiguity in specifying a implicit blob address. A
|
|
|
+ * blob in the $root container cannot include a / in its name and must be
|
|
|
+ * careful not to include a trailing '/' when referencing blobs in the
|
|
|
+ * $root container. A '/; in the $root container permits ambiguous blob
|
|
|
+ * names as in the following example involving two containers $root and
|
|
|
+ * mycontainer: http://myaccount.blob.core.windows.net/$root
|
|
|
+ * http://myaccount.blob.core.windows.net/mycontainer If the URL
|
|
|
+ * "mycontainer/somefile.txt were allowed in $root then the URL:
|
|
|
+ * http://myaccount.blob.core.windows.net/mycontainer/myblob.txt could
|
|
|
+ * mean either: (1) container=mycontainer; blob=myblob.txt (2)
|
|
|
+ * container=$root; blob=mycontainer/myblob.txt
|
|
|
+ *
|
|
|
+ * To avoid this type of ambiguity the Azure blob storage prevents
|
|
|
+ * arbitrary path under $root. For a simple and more consistent user
|
|
|
+ * experience it was decided to eliminate the opportunity for creating
|
|
|
+ * such paths by making the $root container read-only under WASB.
|
|
|
+ */
|
|
|
+
|
|
|
+ // Check that no attempt is made to write to blobs on default
|
|
|
+ // $root containers.
|
|
|
+ if (AZURE_ROOT_CONTAINER.equals(getContainerFromAuthority(sessionUri))) {
|
|
|
+ // Azure containers are restricted to non-root containers.
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Writes to '%s' container for URI '%s' are prohibited, "
|
|
|
+ + "only updates on non-root containers permitted.",
|
|
|
+ AZURE_ROOT_CONTAINER, sessionUri.toString());
|
|
|
+ throw new AzureException(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the block blob reference from the store's container and
|
|
|
+ // return it.
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ storePermissionStatus(blob, permissionStatus);
|
|
|
+
|
|
|
+ // Create the output stream for the Azure blob.
|
|
|
+ OutputStream outputStream = blob.openOutputStream(getUploadOptions(),
|
|
|
+ getInstrumentedContext());
|
|
|
+
|
|
|
+ // Return to caller with DataOutput stream.
|
|
|
+ DataOutputStream dataOutStream = new DataOutputStream(outputStream);
|
|
|
+ return dataOutStream;
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Caught exception while attempting to open the blob output stream.
|
|
|
+ // Re-throw as an Azure storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Default permission to use when no permission metadata is found.
|
|
|
+ *
|
|
|
+ * @return The default permission to use.
|
|
|
+ */
|
|
|
+ private static PermissionStatus defaultPermissionNoBlobMetadata() {
|
|
|
+ return new PermissionStatus("", "", FsPermission.getDefault());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void storeMetadataAttribute(CloudBlockBlobWrapper blob,
|
|
|
+ String key, String value) {
|
|
|
+ HashMap<String, String> metadata = blob.getMetadata();
|
|
|
+ if (null == metadata) {
|
|
|
+ metadata = new HashMap<String, String>();
|
|
|
+ }
|
|
|
+ metadata.put(key, value);
|
|
|
+ blob.setMetadata(metadata);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getMetadataAttribute(CloudBlockBlobWrapper blob,
|
|
|
+ String... keyAlternatives) {
|
|
|
+ HashMap<String, String> metadata = blob.getMetadata();
|
|
|
+ if (null == metadata) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ for (String key : keyAlternatives) {
|
|
|
+ if (metadata.containsKey(key)) {
|
|
|
+ return metadata.get(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void removeMetadataAttribute(CloudBlockBlobWrapper blob,
|
|
|
+ String key) {
|
|
|
+ HashMap<String, String> metadata = blob.getMetadata();
|
|
|
+ if (metadata != null) {
|
|
|
+ metadata.remove(key);
|
|
|
+ blob.setMetadata(metadata);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void storePermissionStatus(CloudBlockBlobWrapper blob,
|
|
|
+ PermissionStatus permissionStatus) {
|
|
|
+ storeMetadataAttribute(blob, PERMISSION_METADATA_KEY,
|
|
|
+ PERMISSION_JSON_SERIALIZER.toJSON(permissionStatus));
|
|
|
+ // Remove the old metadata key if present
|
|
|
+ removeMetadataAttribute(blob, OLD_PERMISSION_METADATA_KEY);
|
|
|
+ }
|
|
|
+
|
|
|
+ private PermissionStatus getPermissionStatus(CloudBlockBlobWrapper blob) {
|
|
|
+ String permissionMetadataValue = getMetadataAttribute(blob,
|
|
|
+ PERMISSION_METADATA_KEY, OLD_PERMISSION_METADATA_KEY);
|
|
|
+ if (permissionMetadataValue != null) {
|
|
|
+ return PermissionStatusJsonSerializer
|
|
|
+ .fromJSONString(permissionMetadataValue);
|
|
|
+ } else {
|
|
|
+ return defaultPermissionNoBlobMetadata();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void storeFolderAttribute(CloudBlockBlobWrapper blob) {
|
|
|
+ storeMetadataAttribute(blob, IS_FOLDER_METADATA_KEY, "true");
|
|
|
+ // Remove the old metadata key if present
|
|
|
+ removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void storeLinkAttribute(CloudBlockBlobWrapper blob,
|
|
|
+ String linkTarget) {
|
|
|
+ storeMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
|
|
|
+ linkTarget);
|
|
|
+ // Remove the old metadata key if present
|
|
|
+ removeMetadataAttribute(blob,
|
|
|
+ OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getLinkAttributeValue(CloudBlockBlobWrapper blob) {
|
|
|
+ return getMetadataAttribute(blob,
|
|
|
+ LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY,
|
|
|
+ OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static boolean retrieveFolderAttribute(CloudBlockBlobWrapper blob) {
|
|
|
+ HashMap<String, String> metadata = blob.getMetadata();
|
|
|
+ return null != metadata
|
|
|
+ && (metadata.containsKey(IS_FOLDER_METADATA_KEY) || metadata
|
|
|
+ .containsKey(OLD_IS_FOLDER_METADATA_KEY));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void storeVersionAttribute(CloudBlobContainerWrapper container) {
|
|
|
+ HashMap<String, String> metadata = container.getMetadata();
|
|
|
+ if (null == metadata) {
|
|
|
+ metadata = new HashMap<String, String>();
|
|
|
+ }
|
|
|
+ metadata.put(VERSION_METADATA_KEY, CURRENT_WASB_VERSION);
|
|
|
+ if (metadata.containsKey(OLD_VERSION_METADATA_KEY)) {
|
|
|
+ metadata.remove(OLD_VERSION_METADATA_KEY);
|
|
|
+ }
|
|
|
+ container.setMetadata(metadata);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String retrieveVersionAttribute(
|
|
|
+ CloudBlobContainerWrapper container) {
|
|
|
+ HashMap<String, String> metadata = container.getMetadata();
|
|
|
+ if (metadata == null) {
|
|
|
+ return null;
|
|
|
+ } else if (metadata.containsKey(VERSION_METADATA_KEY)) {
|
|
|
+ return metadata.get(VERSION_METADATA_KEY);
|
|
|
+ } else if (metadata.containsKey(OLD_VERSION_METADATA_KEY)) {
|
|
|
+ return metadata.get(OLD_VERSION_METADATA_KEY);
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void storeEmptyFolder(String key, PermissionStatus permissionStatus)
|
|
|
+ throws AzureException {
|
|
|
+
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if there is an authenticated account associated with the file
|
|
|
+ // this instance of the WASB file system. If not the file system has not
|
|
|
+ // been authenticated and all access is anonymous.
|
|
|
+ if (!isAuthenticatedAccess()) {
|
|
|
+ // Preemptively raise an exception indicating no uploads are
|
|
|
+ // allowed to anonymous accounts.
|
|
|
+ throw new AzureException(
|
|
|
+ "Uploads to to public accounts using anonymous access is prohibited.");
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ checkContainer(ContainerAccessType.PureWrite);
|
|
|
+
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ storePermissionStatus(blob, permissionStatus);
|
|
|
+ storeFolderAttribute(blob);
|
|
|
+ blob.upload(new ByteArrayInputStream(new byte[0]),
|
|
|
+ getInstrumentedContext());
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Caught exception while attempting upload. Re-throw as an Azure
|
|
|
+ // storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stores an empty blob that's linking to the temporary file where're we're
|
|
|
+ * uploading the initial data.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void storeEmptyLinkFile(String key, String tempBlobKey,
|
|
|
+ PermissionStatus permissionStatus) throws AzureException {
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+ // Check if there is an authenticated account associated with the file
|
|
|
+ // this instance of the WASB file system. If not the file system has not
|
|
|
+ // been authenticated and all access is anonymous.
|
|
|
+ if (!isAuthenticatedAccess()) {
|
|
|
+ // Preemptively raise an exception indicating no uploads are
|
|
|
+ // allowed to anonymous accounts.
|
|
|
+ throw new AzureException(
|
|
|
+ "Uploads to to public accounts using anonymous access is prohibited.");
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ checkContainer(ContainerAccessType.PureWrite);
|
|
|
+
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ storePermissionStatus(blob, permissionStatus);
|
|
|
+ storeLinkAttribute(blob, tempBlobKey);
|
|
|
+ blob.upload(new ByteArrayInputStream(new byte[0]),
|
|
|
+ getInstrumentedContext());
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Caught exception while attempting upload. Re-throw as an Azure
|
|
|
+ // storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If the blob with the given key exists and has a link in its metadata to a
|
|
|
+ * temporary file (see storeEmptyLinkFile), this method returns the key to
|
|
|
+ * that temporary file. Otherwise, returns null.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public String getLinkInFileMetadata(String key) throws AzureException {
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ checkContainer(ContainerAccessType.PureRead);
|
|
|
+
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ blob.downloadAttributes(getInstrumentedContext());
|
|
|
+ return getLinkAttributeValue(blob);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Caught exception while attempting download. Re-throw as an Azure
|
|
|
+ // storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Private method to check for authenticated access.
|
|
|
+ *
|
|
|
+ * @ returns boolean -- true if access is credentialed and authenticated and
|
|
|
+ * false otherwise.
|
|
|
+ */
|
|
|
+ private boolean isAuthenticatedAccess() throws AzureException {
|
|
|
+
|
|
|
+ if (isAnonymousCredentials) {
|
|
|
+ // Access to this storage account is unauthenticated.
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ // Access is authenticated.
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This private method uses the root directory or the original container to
|
|
|
+ * list blobs under the directory or container depending on whether the
|
|
|
+ * original file system object was constructed with a short- or long-form URI.
|
|
|
+ * If the root directory is non-null the URI in the file constructor was in
|
|
|
+ * the long form.
|
|
|
+ *
|
|
|
+ * @param includeMetadata
|
|
|
+ * if set, the listed items will have their metadata populated
|
|
|
+ * already.
|
|
|
+ *
|
|
|
+ * @returns blobItems : iterable collection of blob items.
|
|
|
+ * @throws URISyntaxException
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata)
|
|
|
+ throws StorageException, URISyntaxException {
|
|
|
+ return rootDirectory.listBlobs(
|
|
|
+ null,
|
|
|
+ false,
|
|
|
+ includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
|
|
|
+ .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This private method uses the root directory or the original container to
|
|
|
+ * list blobs under the directory or container given a specified prefix for
|
|
|
+ * the directory depending on whether the original file system object was
|
|
|
+ * constructed with a short- or long-form URI. If the root directory is
|
|
|
+ * non-null the URI in the file constructor was in the long form.
|
|
|
+ *
|
|
|
+ * @param aPrefix
|
|
|
+ * : string name representing the prefix of containing blobs.
|
|
|
+ * @param includeMetadata
|
|
|
+ * if set, the listed items will have their metadata populated
|
|
|
+ * already.
|
|
|
+ *
|
|
|
+ * @returns blobItems : iterable collection of blob items.
|
|
|
+ * @throws URISyntaxException
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
|
|
|
+ boolean includeMetadata) throws StorageException, URISyntaxException {
|
|
|
+
|
|
|
+ return rootDirectory.listBlobs(
|
|
|
+ aPrefix,
|
|
|
+ false,
|
|
|
+ includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet
|
|
|
+ .noneOf(BlobListingDetails.class), null, getInstrumentedContext());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This private method uses the root directory or the original container to
|
|
|
+ * list blobs under the directory or container given a specified prefix for
|
|
|
+ * the directory depending on whether the original file system object was
|
|
|
+ * constructed with a short- or long-form URI. It also uses the specified flat
|
|
|
+ * or hierarchical option, listing details options, request options, and
|
|
|
+ * operation context.
|
|
|
+ *
|
|
|
+ * @param aPrefix
|
|
|
+ * string name representing the prefix of containing blobs.
|
|
|
+ * @param useFlatBlobListing
|
|
|
+ * - the list is flat if true, or hierarchical otherwise.
|
|
|
+ * @param listingDetails
|
|
|
+ * - determine whether snapshots, metadata, committed/uncommitted
|
|
|
+ * data
|
|
|
+ * @param options
|
|
|
+ * - object specifying additional options for the request. null =
|
|
|
+ * default options
|
|
|
+ * @param opContext
|
|
|
+ * - context of the current operation
|
|
|
+ * @returns blobItems : iterable collection of blob items.
|
|
|
+ * @throws URISyntaxException
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
|
|
|
+ boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails,
|
|
|
+ BlobRequestOptions options, OperationContext opContext)
|
|
|
+ throws StorageException, URISyntaxException {
|
|
|
+
|
|
|
+ CloudBlobDirectoryWrapper directory = this.container
|
|
|
+ .getDirectoryReference(aPrefix);
|
|
|
+ return directory.listBlobs(null, useFlatBlobListing, listingDetails,
|
|
|
+ options, opContext);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This private method uses the root directory or the original container to
|
|
|
+ * get the block blob reference depending on whether the original file system
|
|
|
+ * object was constructed with a short- or long-form URI. If the root
|
|
|
+ * directory is non-null the URI in the file constructor was in the long form.
|
|
|
+ *
|
|
|
+ * @param aKey
|
|
|
+ * : a key used to query Azure for the block blob.
|
|
|
+ * @returns blob : a reference to the Azure block blob corresponding to the
|
|
|
+ * key.
|
|
|
+ * @throws URISyntaxException
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private CloudBlockBlobWrapper getBlobReference(String aKey)
|
|
|
+ throws StorageException, URISyntaxException {
|
|
|
+
|
|
|
+ CloudBlockBlobWrapper blob = this.container.getBlockBlobReference(aKey);
|
|
|
+
|
|
|
+ blob.setStreamMinimumReadSizeInBytes(downloadBlockSizeBytes);
|
|
|
+ blob.setWriteBlockSizeInBytes(uploadBlockSizeBytes);
|
|
|
+
|
|
|
+ // Return with block blob.
|
|
|
+ return blob;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This private method normalizes the key by stripping the container name from
|
|
|
+ * the path and returns a path relative to the root directory of the
|
|
|
+ * container.
|
|
|
+ *
|
|
|
+ * @param keyUri
|
|
|
+ * - adjust this key to a path relative to the root directory
|
|
|
+ *
|
|
|
+ * @returns normKey
|
|
|
+ */
|
|
|
+ private String normalizeKey(URI keyUri) {
|
|
|
+ String normKey;
|
|
|
+
|
|
|
+ // Strip the container name from the path and return the path
|
|
|
+ // relative to the root directory of the container.
|
|
|
+ int parts = isStorageEmulator ? 4 : 3;
|
|
|
+ normKey = keyUri.getPath().split("/", parts)[(parts - 1)];
|
|
|
+
|
|
|
+ // Return the fixed key.
|
|
|
+ return normKey;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This private method normalizes the key by stripping the container name from
|
|
|
+ * the path and returns a path relative to the root directory of the
|
|
|
+ * container.
|
|
|
+ *
|
|
|
+ * @param blob
|
|
|
+ * - adjust the key to this blob to a path relative to the root
|
|
|
+ * directory
|
|
|
+ *
|
|
|
+ * @returns normKey
|
|
|
+ */
|
|
|
+ private String normalizeKey(CloudBlockBlobWrapper blob) {
|
|
|
+ return normalizeKey(blob.getUri());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This private method normalizes the key by stripping the container name from
|
|
|
+ * the path and returns a path relative to the root directory of the
|
|
|
+ * container.
|
|
|
+ *
|
|
|
+ * @param blob
|
|
|
+ * - adjust the key to this directory to a path relative to the root
|
|
|
+ * directory
|
|
|
+ *
|
|
|
+ * @returns normKey
|
|
|
+ */
|
|
|
+ private String normalizeKey(CloudBlobDirectoryWrapper directory) {
|
|
|
+ String dirKey = normalizeKey(directory.getUri());
|
|
|
+ // Strip the last delimiter
|
|
|
+ if (dirKey.endsWith(PATH_DELIMITER)) {
|
|
|
+ dirKey = dirKey.substring(0, dirKey.length() - 1);
|
|
|
+ }
|
|
|
+ return dirKey;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Default method to creates a new OperationContext for the Azure Storage
|
|
|
+ * operation that has listeners hooked to it that will update the metrics for
|
|
|
+ * this file system. This method does not bind to receive send request
|
|
|
+ * callbacks by default.
|
|
|
+ *
|
|
|
+ * @return The OperationContext object to use.
|
|
|
+ */
|
|
|
+ private OperationContext getInstrumentedContext() {
|
|
|
+ // Default is to not bind to receive send callback events.
|
|
|
+ return getInstrumentedContext(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a new OperationContext for the Azure Storage operation that has
|
|
|
+ * listeners hooked to it that will update the metrics for this file system.
|
|
|
+ *
|
|
|
+ * @param bindConcurrentOOBIo
|
|
|
+ * - bind to intercept send request call backs to handle OOB I/O.
|
|
|
+ *
|
|
|
+ * @return The OperationContext object to use.
|
|
|
+ */
|
|
|
+ private OperationContext getInstrumentedContext(boolean bindConcurrentOOBIo) {
|
|
|
+
|
|
|
+ OperationContext operationContext = new OperationContext();
|
|
|
+
|
|
|
+ if (selfThrottlingEnabled) {
|
|
|
+ SelfThrottlingIntercept.hook(operationContext, selfThrottlingReadFactor,
|
|
|
+ selfThrottlingWriteFactor);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Bind operation context to receive send request callbacks on this
|
|
|
+ // operation.
|
|
|
+ // If reads concurrent to OOB writes are allowed, the interception will
|
|
|
+ // reset the conditional header on all Azure blob storage read requests.
|
|
|
+ if (bindConcurrentOOBIo) {
|
|
|
+ SendRequestIntercept.bind(storageInteractionLayer.getCredentials(),
|
|
|
+ operationContext, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (testHookOperationContext != null) {
|
|
|
+ operationContext = testHookOperationContext
|
|
|
+ .modifyOperationContext(operationContext);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Return the operation context.
|
|
|
+ return operationContext;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FileMetadata retrieveMetadata(String key) throws IOException {
|
|
|
+
|
|
|
+ // Attempts to check status may occur before opening any streams so first,
|
|
|
+ // check if a session exists, if not create a session with the Azure storage
|
|
|
+ // server.
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Retrieving metadata for " + key);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (checkContainer(ContainerAccessType.PureRead) == ContainerState.DoesntExist) {
|
|
|
+ // The container doesn't exist, so spare some service calls and just
|
|
|
+ // return null now.
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Handle the degenerate cases where the key does not exist or the
|
|
|
+ // key is a container.
|
|
|
+ if (key.equals("/")) {
|
|
|
+ // The key refers to root directory of container.
|
|
|
+ // Set the modification time for root to zero.
|
|
|
+ return new FileMetadata(key, 0, defaultPermissionNoBlobMetadata(),
|
|
|
+ BlobMaterialization.Implicit);
|
|
|
+ }
|
|
|
+
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+
|
|
|
+ // Download attributes and return file metadata only if the blob
|
|
|
+ // exists.
|
|
|
+ if (null != blob && blob.exists(getInstrumentedContext())) {
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Found " + key
|
|
|
+ + " as an explicit blob. Checking if it's a file or folder.");
|
|
|
+ }
|
|
|
+
|
|
|
+ // The blob exists, so capture the metadata from the blob
|
|
|
+ // properties.
|
|
|
+ blob.downloadAttributes(getInstrumentedContext());
|
|
|
+ BlobProperties properties = blob.getProperties();
|
|
|
+
|
|
|
+ if (retrieveFolderAttribute(blob)) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(key + " is a folder blob.");
|
|
|
+ }
|
|
|
+ return new FileMetadata(key, properties.getLastModified().getTime(),
|
|
|
+ getPermissionStatus(blob), BlobMaterialization.Explicit);
|
|
|
+ } else {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(key + " is a normal blob.");
|
|
|
+ }
|
|
|
+
|
|
|
+ return new FileMetadata(
|
|
|
+ key, // Always return denormalized key with metadata.
|
|
|
+ properties.getLength(), properties.getLastModified().getTime(),
|
|
|
+ getPermissionStatus(blob));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // There is no file with that key name, but maybe it is a folder.
|
|
|
+ // Query the underlying folder/container to list the blobs stored
|
|
|
+ // there under that key.
|
|
|
+ Iterable<ListBlobItem> objects = listRootBlobs(key, true,
|
|
|
+ EnumSet.of(BlobListingDetails.METADATA), null,
|
|
|
+ getInstrumentedContext());
|
|
|
+
|
|
|
+ // Check if the directory/container has the blob items.
|
|
|
+ for (ListBlobItem blobItem : objects) {
|
|
|
+ if (blobItem instanceof CloudBlockBlobWrapper) {
|
|
|
+ LOG.debug("Found blob as a directory-using this file under it to infer its properties "
|
|
|
+ + blobItem.getUri());
|
|
|
+
|
|
|
+ blob = (CloudBlockBlobWrapper) blobItem;
|
|
|
+ // The key specifies a directory. Create a FileMetadata object which
|
|
|
+ // specifies as such.
|
|
|
+ BlobProperties properties = blob.getProperties();
|
|
|
+
|
|
|
+ return new FileMetadata(key, properties.getLastModified().getTime(),
|
|
|
+ getPermissionStatus(blob), BlobMaterialization.Implicit);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Return to caller with a null metadata object.
|
|
|
+ return null;
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Re-throw the exception as an Azure storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DataInputStream retrieve(String key) throws AzureException {
|
|
|
+ InputStream inStream = null;
|
|
|
+ BufferedInputStream inBufStream = null;
|
|
|
+ try {
|
|
|
+ try {
|
|
|
+ // Check if a session exists, if not create a session with the
|
|
|
+ // Azure storage server.
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+ checkContainer(ContainerAccessType.PureRead);
|
|
|
+
|
|
|
+ // Get blob reference and open the input buffer stream.
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ inStream = blob.openInputStream(getDownloadOptions(),
|
|
|
+ getInstrumentedContext(isConcurrentOOBAppendAllowed()));
|
|
|
+
|
|
|
+ inBufStream = new BufferedInputStream(inStream);
|
|
|
+
|
|
|
+ // Return a data input stream.
|
|
|
+ DataInputStream inDataStream = new DataInputStream(inBufStream);
|
|
|
+ return inDataStream;
|
|
|
+ }
|
|
|
+ catch (Exception e){
|
|
|
+ // close the streams on error.
|
|
|
+ // We use nested try-catch as stream.close() can throw IOException.
|
|
|
+ if(inBufStream != null){
|
|
|
+ inBufStream.close();
|
|
|
+ }
|
|
|
+ if(inStream != null){
|
|
|
+ inStream.close();
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Re-throw as an Azure storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DataInputStream retrieve(String key, long startByteOffset)
|
|
|
+ throws AzureException {
|
|
|
+
|
|
|
+ InputStream in = null;
|
|
|
+ DataInputStream inDataStream = null;
|
|
|
+ try {
|
|
|
+ try {
|
|
|
+ // Check if a session exists, if not create a session with the
|
|
|
+ // Azure storage server.
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+ checkContainer(ContainerAccessType.PureRead);
|
|
|
+
|
|
|
+ // Get blob reference and open the input buffer stream.
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+
|
|
|
+ // Open input stream and seek to the start offset.
|
|
|
+ in = blob.openInputStream(getDownloadOptions(),
|
|
|
+ getInstrumentedContext(isConcurrentOOBAppendAllowed()));
|
|
|
+
|
|
|
+ // Create a data input stream.
|
|
|
+ inDataStream = new DataInputStream(in);
|
|
|
+ long skippedBytes = inDataStream.skip(startByteOffset);
|
|
|
+ if (skippedBytes != startByteOffset) {
|
|
|
+ throw new IOException("Couldn't skip the requested number of bytes");
|
|
|
+ }
|
|
|
+ return inDataStream;
|
|
|
+ }
|
|
|
+ catch (Exception e){
|
|
|
+ // close the streams on error.
|
|
|
+ // We use nested try-catch as stream.close() can throw IOException.
|
|
|
+ if(inDataStream != null){
|
|
|
+ inDataStream.close();
|
|
|
+ }
|
|
|
+ if(in != null){
|
|
|
+ inDataStream.close();
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Re-throw as an Azure storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public PartialListing list(String prefix, final int maxListingCount,
|
|
|
+ final int maxListingDepth) throws IOException {
|
|
|
+ return list(prefix, maxListingCount, maxListingDepth, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public PartialListing list(String prefix, final int maxListingCount,
|
|
|
+ final int maxListingDepth, String priorLastKey) throws IOException {
|
|
|
+ return list(prefix, PATH_DELIMITER, maxListingCount, maxListingDepth,
|
|
|
+ priorLastKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public PartialListing listAll(String prefix, final int maxListingCount,
|
|
|
+ final int maxListingDepth, String priorLastKey) throws IOException {
|
|
|
+ return list(prefix, null, maxListingCount, maxListingDepth, priorLastKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Searches the given list of {@link FileMetadata} objects for a directory
|
|
|
+ * with the given key.
|
|
|
+ *
|
|
|
+ * @param list
|
|
|
+ * The list to search.
|
|
|
+ * @param key
|
|
|
+ * The key to search for.
|
|
|
+ * @return The wanted directory, or null if not found.
|
|
|
+ */
|
|
|
+ private static FileMetadata getDirectoryInList(
|
|
|
+ final Iterable<FileMetadata> list, String key) {
|
|
|
+ for (FileMetadata current : list) {
|
|
|
+ if (current.isDir() && current.getKey().equals(key)) {
|
|
|
+ return current;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private PartialListing list(String prefix, String delimiter,
|
|
|
+ final int maxListingCount, final int maxListingDepth, String priorLastKey)
|
|
|
+ throws IOException {
|
|
|
+ try {
|
|
|
+ checkContainer(ContainerAccessType.PureRead);
|
|
|
+
|
|
|
+ if (0 < prefix.length() && !prefix.endsWith(PATH_DELIMITER)) {
|
|
|
+ prefix += PATH_DELIMITER;
|
|
|
+ }
|
|
|
+
|
|
|
+ Iterable<ListBlobItem> objects;
|
|
|
+ if (prefix.equals("/")) {
|
|
|
+ objects = listRootBlobs(true);
|
|
|
+ } else {
|
|
|
+ objects = listRootBlobs(prefix, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ ArrayList<FileMetadata> fileMetadata = new ArrayList<FileMetadata>();
|
|
|
+ for (ListBlobItem blobItem : objects) {
|
|
|
+ // Check that the maximum listing count is not exhausted.
|
|
|
+ //
|
|
|
+ if (0 < maxListingCount && fileMetadata.size() >= maxListingCount) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (blobItem instanceof CloudBlockBlobWrapper) {
|
|
|
+ String blobKey = null;
|
|
|
+ CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
|
|
|
+ BlobProperties properties = blob.getProperties();
|
|
|
+
|
|
|
+ // Determine format of the blob name depending on whether an absolute
|
|
|
+ // path is being used or not.
|
|
|
+ blobKey = normalizeKey(blob);
|
|
|
+
|
|
|
+ FileMetadata metadata;
|
|
|
+ if (retrieveFolderAttribute(blob)) {
|
|
|
+ metadata = new FileMetadata(blobKey, properties.getLastModified()
|
|
|
+ .getTime(), getPermissionStatus(blob),
|
|
|
+ BlobMaterialization.Explicit);
|
|
|
+ } else {
|
|
|
+ metadata = new FileMetadata(blobKey, properties.getLength(),
|
|
|
+ properties.getLastModified().getTime(),
|
|
|
+ getPermissionStatus(blob));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add the metadata to the list, but remove any existing duplicate
|
|
|
+ // entries first that we may have added by finding nested files.
|
|
|
+ FileMetadata existing = getDirectoryInList(fileMetadata, blobKey);
|
|
|
+ if (existing != null) {
|
|
|
+ fileMetadata.remove(existing);
|
|
|
+ }
|
|
|
+ fileMetadata.add(metadata);
|
|
|
+ } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
|
|
|
+ CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
|
|
|
+ // Determine format of directory name depending on whether an absolute
|
|
|
+ // path is being used or not.
|
|
|
+ //
|
|
|
+ String dirKey = normalizeKey(directory);
|
|
|
+ // Strip the last /
|
|
|
+ if (dirKey.endsWith(PATH_DELIMITER)) {
|
|
|
+ dirKey = dirKey.substring(0, dirKey.length() - 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reached the targeted listing depth. Return metadata for the
|
|
|
+ // directory using default permissions.
|
|
|
+ //
|
|
|
+ // Note: Something smarter should be done about permissions. Maybe
|
|
|
+ // inherit the permissions of the first non-directory blob.
|
|
|
+ // Also, getting a proper value for last-modified is tricky.
|
|
|
+ FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
|
|
|
+ defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit);
|
|
|
+
|
|
|
+ // Add the directory metadata to the list only if it's not already
|
|
|
+ // there.
|
|
|
+ if (getDirectoryInList(fileMetadata, dirKey) == null) {
|
|
|
+ fileMetadata.add(directoryMetadata);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Currently at a depth of one, decrement the listing depth for
|
|
|
+ // sub-directories.
|
|
|
+ buildUpList(directory, fileMetadata, maxListingCount,
|
|
|
+ maxListingDepth - 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Note: Original code indicated that this may be a hack.
|
|
|
+ priorLastKey = null;
|
|
|
+ return new PartialListing(priorLastKey,
|
|
|
+ fileMetadata.toArray(new FileMetadata[] {}),
|
|
|
+ 0 == fileMetadata.size() ? new String[] {} : new String[] { prefix });
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Re-throw as an Azure storage exception.
|
|
|
+ //
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Build up a metadata list of blobs in an Azure blob directory. This method
|
|
|
+ * uses a in-order first traversal of blob directory structures to maintain
|
|
|
+ * the sorted order of the blob names.
|
|
|
+ *
|
|
|
+ * @param dir
|
|
|
+ * -- Azure blob directory
|
|
|
+ *
|
|
|
+ * @param list
|
|
|
+ * -- a list of file metadata objects for each non-directory blob.
|
|
|
+ *
|
|
|
+ * @param maxListingLength
|
|
|
+ * -- maximum length of the built up list.
|
|
|
+ */
|
|
|
+ private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
|
|
|
+ ArrayList<FileMetadata> aFileMetadataList, final int maxListingCount,
|
|
|
+ final int maxListingDepth) throws Exception {
|
|
|
+
|
|
|
+ // Push the blob directory onto the stack.
|
|
|
+ LinkedList<Iterator<ListBlobItem>> dirIteratorStack = new LinkedList<Iterator<ListBlobItem>>();
|
|
|
+
|
|
|
+ Iterable<ListBlobItem> blobItems = aCloudBlobDirectory.listBlobs(null,
|
|
|
+ false, EnumSet.of(BlobListingDetails.METADATA), null,
|
|
|
+ getInstrumentedContext());
|
|
|
+ Iterator<ListBlobItem> blobItemIterator = blobItems.iterator();
|
|
|
+
|
|
|
+ if (0 == maxListingDepth || 0 == maxListingCount) {
|
|
|
+ // Recurrence depth and listing count are already exhausted. Return
|
|
|
+ // immediately.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // The directory listing depth is unbounded if the maximum listing depth
|
|
|
+ // is negative.
|
|
|
+ final boolean isUnboundedDepth = (maxListingDepth < 0);
|
|
|
+
|
|
|
+ // Reset the current directory listing depth.
|
|
|
+ int listingDepth = 1;
|
|
|
+
|
|
|
+ // Loop until all directories have been traversed in-order. Loop only
|
|
|
+ // the following conditions are satisfied:
|
|
|
+ // (1) The stack is not empty, and
|
|
|
+ // (2) maxListingCount > 0 implies that the number of items in the
|
|
|
+ // metadata list is less than the max listing count.
|
|
|
+ while (null != blobItemIterator
|
|
|
+ && (maxListingCount <= 0 || aFileMetadataList.size() < maxListingCount)) {
|
|
|
+ while (blobItemIterator.hasNext()) {
|
|
|
+ // Check if the count of items on the list exhausts the maximum
|
|
|
+ // listing count.
|
|
|
+ //
|
|
|
+ if (0 < maxListingCount && aFileMetadataList.size() >= maxListingCount) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ ListBlobItem blobItem = blobItemIterator.next();
|
|
|
+
|
|
|
+ // Add the file metadata to the list if this is not a blob
|
|
|
+ // directory item.
|
|
|
+ if (blobItem instanceof CloudBlockBlobWrapper) {
|
|
|
+ String blobKey = null;
|
|
|
+ CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem;
|
|
|
+ BlobProperties properties = blob.getProperties();
|
|
|
+
|
|
|
+ // Determine format of the blob name depending on whether an absolute
|
|
|
+ // path is being used or not.
|
|
|
+ blobKey = normalizeKey(blob);
|
|
|
+
|
|
|
+ FileMetadata metadata;
|
|
|
+ if (retrieveFolderAttribute(blob)) {
|
|
|
+ metadata = new FileMetadata(blobKey, properties.getLastModified()
|
|
|
+ .getTime(), getPermissionStatus(blob),
|
|
|
+ BlobMaterialization.Explicit);
|
|
|
+ } else {
|
|
|
+ metadata = new FileMetadata(blobKey, properties.getLength(),
|
|
|
+ properties.getLastModified().getTime(),
|
|
|
+ getPermissionStatus(blob));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add the directory metadata to the list only if it's not already
|
|
|
+ // there.
|
|
|
+ FileMetadata existing = getDirectoryInList(aFileMetadataList, blobKey);
|
|
|
+ if (existing != null) {
|
|
|
+ aFileMetadataList.remove(existing);
|
|
|
+ }
|
|
|
+ aFileMetadataList.add(metadata);
|
|
|
+ } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
|
|
|
+ CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
|
|
|
+
|
|
|
+ // This is a directory blob, push the current iterator onto
|
|
|
+ // the stack of iterators and start iterating through the current
|
|
|
+ // directory.
|
|
|
+ if (isUnboundedDepth || maxListingDepth > listingDepth) {
|
|
|
+ // Push the current directory on the stack and increment the listing
|
|
|
+ // depth.
|
|
|
+ dirIteratorStack.push(blobItemIterator);
|
|
|
+ ++listingDepth;
|
|
|
+
|
|
|
+ // The current blob item represents the new directory. Get
|
|
|
+ // an iterator for this directory and continue by iterating through
|
|
|
+ // this directory.
|
|
|
+ blobItems = directory.listBlobs(null, false,
|
|
|
+ EnumSet.noneOf(BlobListingDetails.class), null,
|
|
|
+ getInstrumentedContext());
|
|
|
+ blobItemIterator = blobItems.iterator();
|
|
|
+ } else {
|
|
|
+ // Determine format of directory name depending on whether an
|
|
|
+ // absolute path is being used or not.
|
|
|
+ String dirKey = normalizeKey(directory);
|
|
|
+
|
|
|
+ if (getDirectoryInList(aFileMetadataList, dirKey) == null) {
|
|
|
+ // Reached the targeted listing depth. Return metadata for the
|
|
|
+ // directory using default permissions.
|
|
|
+ //
|
|
|
+ // Note: Something smarter should be done about permissions. Maybe
|
|
|
+ // inherit the permissions of the first non-directory blob.
|
|
|
+ // Also, getting a proper value for last-modified is tricky.
|
|
|
+ FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
|
|
|
+ defaultPermissionNoBlobMetadata(),
|
|
|
+ BlobMaterialization.Implicit);
|
|
|
+
|
|
|
+ // Add the directory metadata to the list.
|
|
|
+ aFileMetadataList.add(directoryMetadata);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Traversal of directory tree
|
|
|
+
|
|
|
+ // Check if the iterator stack is empty. If it is set the next blob
|
|
|
+ // iterator to null. This will act as a terminator for the for-loop.
|
|
|
+ // Otherwise pop the next iterator from the stack and continue looping.
|
|
|
+ //
|
|
|
+ if (dirIteratorStack.isEmpty()) {
|
|
|
+ blobItemIterator = null;
|
|
|
+ } else {
|
|
|
+ // Pop the next directory item from the stack and decrement the
|
|
|
+ // depth.
|
|
|
+ blobItemIterator = dirIteratorStack.pop();
|
|
|
+ --listingDepth;
|
|
|
+
|
|
|
+ // Assertion: Listing depth should not be less than zero.
|
|
|
+ if (listingDepth < 0) {
|
|
|
+ throw new AssertionError("Non-negative listing depth expected");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Deletes the given blob, taking special care that if we get a blob-not-found
|
|
|
+ * exception upon retrying the operation, we just swallow the error since what
|
|
|
+ * most probably happened is that the first operation succeeded on the server.
|
|
|
+ *
|
|
|
+ * @param blob
|
|
|
+ * The blob to delete.
|
|
|
+ * @throws StorageException
|
|
|
+ */
|
|
|
+ private void safeDelete(CloudBlockBlobWrapper blob) throws StorageException {
|
|
|
+ OperationContext operationContext = getInstrumentedContext();
|
|
|
+ try {
|
|
|
+ blob.delete(operationContext);
|
|
|
+ } catch (StorageException e) {
|
|
|
+ // On exception, check that if:
|
|
|
+ // 1. It's a BlobNotFound exception AND
|
|
|
+ // 2. It got there after one-or-more retries THEN
|
|
|
+ // we swallow the exception.
|
|
|
+ if (e.getErrorCode() != null && e.getErrorCode().equals("BlobNotFound")
|
|
|
+ && operationContext.getRequestResults().size() > 1
|
|
|
+ && operationContext.getRequestResults().get(0).getException() != null) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Swallowing delete exception on retry: " + e.getMessage());
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void delete(String key) throws IOException {
|
|
|
+ try {
|
|
|
+ if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) {
|
|
|
+ // Container doesn't exist, no need to do anything
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the blob reference an delete it.
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ if (blob.exists(getInstrumentedContext())) {
|
|
|
+ safeDelete(blob);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Re-throw as an Azure storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void rename(String srcKey, String dstKey) throws IOException {
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Moving " + srcKey + " to " + dstKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // Attempts rename may occur before opening any streams so first,
|
|
|
+ // check if a session exists, if not create a session with the Azure
|
|
|
+ // storage server.
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ checkContainer(ContainerAccessType.ReadThenWrite);
|
|
|
+ // Get the source blob and assert its existence. If the source key
|
|
|
+ // needs to be normalized then normalize it.
|
|
|
+ CloudBlockBlobWrapper srcBlob = getBlobReference(srcKey);
|
|
|
+
|
|
|
+ if (!srcBlob.exists(getInstrumentedContext())) {
|
|
|
+ throw new AzureException("Source blob " + srcKey + " does not exist.");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get the destination blob. The destination key always needs to be
|
|
|
+ // normalized.
|
|
|
+ CloudBlockBlobWrapper dstBlob = getBlobReference(dstKey);
|
|
|
+
|
|
|
+ // Rename the source blob to the destination blob by copying it to
|
|
|
+ // the destination blob then deleting it.
|
|
|
+ //
|
|
|
+ dstBlob.startCopyFromBlob(srcBlob, getInstrumentedContext());
|
|
|
+ waitForCopyToComplete(dstBlob, getInstrumentedContext());
|
|
|
+
|
|
|
+ safeDelete(srcBlob);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Re-throw exception as an Azure storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitForCopyToComplete(CloudBlockBlobWrapper blob,
|
|
|
+ OperationContext opContext) throws AzureException {
|
|
|
+ boolean copyInProgress = true;
|
|
|
+ int exceptionCount = 0;
|
|
|
+ while (copyInProgress) {
|
|
|
+ try {
|
|
|
+ blob.downloadAttributes(opContext);
|
|
|
+ } catch (StorageException se) {
|
|
|
+ exceptionCount++;
|
|
|
+ if(exceptionCount > 10){
|
|
|
+ throw new AzureException("Too many storage exceptions during waitForCopyToComplete", se);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // test for null because mocked filesystem doesn't know about copystates
|
|
|
+ // yet.
|
|
|
+ copyInProgress = (blob.getCopyState() != null && blob.getCopyState()
|
|
|
+ .getStatus() == CopyStatus.PENDING);
|
|
|
+ if (copyInProgress) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Changes the permission status on the given key.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void changePermissionStatus(String key, PermissionStatus newPermission)
|
|
|
+ throws AzureException {
|
|
|
+ try {
|
|
|
+ checkContainer(ContainerAccessType.ReadThenWrite);
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ blob.downloadAttributes(getInstrumentedContext());
|
|
|
+ storePermissionStatus(blob, newPermission);
|
|
|
+ blob.uploadMetadata(getInstrumentedContext());
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void purge(String prefix) throws IOException {
|
|
|
+ try {
|
|
|
+
|
|
|
+ // Attempts to purge may occur before opening any streams so first,
|
|
|
+ // check if a session exists, if not create a session with the Azure
|
|
|
+ // storage server.
|
|
|
+ if (null == storageInteractionLayer) {
|
|
|
+ final String errMsg = String.format(
|
|
|
+ "Storage session expected for URI '%s' but does not exist.",
|
|
|
+ sessionUri);
|
|
|
+ throw new AssertionError(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) {
|
|
|
+ // Container doesn't exist, no need to do anything.
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // Get all blob items with the given prefix from the container and delete
|
|
|
+ // them.
|
|
|
+ Iterable<ListBlobItem> objects = listRootBlobs(prefix, false);
|
|
|
+ for (ListBlobItem blobItem : objects) {
|
|
|
+ ((CloudBlob) blobItem).delete(DeleteSnapshotsOption.NONE, null, null,
|
|
|
+ getInstrumentedContext());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Re-throw as an Azure storage exception.
|
|
|
+ //
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void updateFolderLastModifiedTime(String key, Date lastModified)
|
|
|
+ throws AzureException {
|
|
|
+ try {
|
|
|
+ checkContainer(ContainerAccessType.ReadThenWrite);
|
|
|
+ CloudBlockBlobWrapper blob = getBlobReference(key);
|
|
|
+ blob.getProperties().setLastModified(lastModified);
|
|
|
+ blob.uploadProperties(getInstrumentedContext());
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Caught exception while attempting update the properties. Re-throw as an
|
|
|
+ // Azure storage exception.
|
|
|
+ throw new AzureException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void updateFolderLastModifiedTime(String key) throws AzureException {
|
|
|
+ final Calendar lastModifiedCalendar = Calendar
|
|
|
+ .getInstance(Utility.LOCALE_US);
|
|
|
+ lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE);
|
|
|
+ Date lastModified = lastModifiedCalendar.getTime();
|
|
|
+ updateFolderLastModifiedTime(key, lastModified);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void dump() throws IOException {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() {
|
|
|
+ }
|
|
|
+}
|