|
|
@@ -20,7 +20,12 @@ package org.apache.hadoop.fs.gs;
|
|
|
|
|
|
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*;
|
|
|
import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
|
|
|
+import static java.lang.Math.toIntExact;
|
|
|
|
|
|
+import com.google.api.client.util.BackOff;
|
|
|
+import com.google.api.client.util.ExponentialBackOff;
|
|
|
+import com.google.api.client.util.Sleeper;
|
|
|
+import com.google.api.gax.paging.Page;
|
|
|
import com.google.cloud.storage.*;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
|
|
@@ -32,21 +37,34 @@ import javax.annotation.Nullable;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.channels.WritableByteChannel;
|
|
|
import java.nio.file.FileAlreadyExistsException;
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
/**
|
|
|
* A wrapper around <a href="https://github.com/googleapis/java-storage">Google cloud storage
|
|
|
* client</a>.
|
|
|
*/
|
|
|
class GoogleCloudStorage {
|
|
|
- public static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopFileSystem.class);
|
|
|
+ static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopFileSystem.class);
|
|
|
static final List<Storage.BlobField> BLOB_FIELDS =
|
|
|
- ImmutableList.of(Storage.BlobField.BUCKET, Storage.BlobField.CONTENT_ENCODING,
|
|
|
+ ImmutableList.of(
|
|
|
+ Storage.BlobField.BUCKET, Storage.BlobField.CONTENT_ENCODING,
|
|
|
Storage.BlobField.CONTENT_TYPE, Storage.BlobField.CRC32C, Storage.BlobField.GENERATION,
|
|
|
Storage.BlobField.METADATA, Storage.BlobField.MD5HASH, Storage.BlobField.METAGENERATION,
|
|
|
Storage.BlobField.NAME, Storage.BlobField.SIZE, Storage.BlobField.TIME_CREATED,
|
|
|
Storage.BlobField.UPDATED);
|
|
|
+
|
|
|
+ static final CreateObjectOptions EMPTY_OBJECT_CREATE_OPTIONS =
|
|
|
+ CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder()
|
|
|
+ .setEnsureEmptyObjectsMetadataMatch(false)
|
|
|
+ .build();
|
|
|
+
|
|
|
private final Storage storage;
|
|
|
private final GoogleHadoopFileSystemConfiguration configuration;
|
|
|
|
|
|
@@ -55,13 +73,20 @@ class GoogleCloudStorage {
|
|
|
* is in WIP.
|
|
|
*/
|
|
|
GoogleCloudStorage(GoogleHadoopFileSystemConfiguration configuration) throws IOException {
|
|
|
- // TODO: Set projectId
|
|
|
// TODO: Set credentials
|
|
|
- this.storage = StorageOptions.newBuilder().build().getService();
|
|
|
+ this.storage = createStorage(configuration.getProjectId());
|
|
|
this.configuration = configuration;
|
|
|
}
|
|
|
|
|
|
- public WritableByteChannel create(final StorageResourceId resourceId, final CreateOptions options)
|
|
|
+ private static Storage createStorage(String projectId) {
|
|
|
+ if (projectId != null) {
|
|
|
+ return StorageOptions.newBuilder().setProjectId(projectId).build().getService();
|
|
|
+ }
|
|
|
+
|
|
|
+ return StorageOptions.newBuilder().build().getService();
|
|
|
+ }
|
|
|
+
|
|
|
+ WritableByteChannel create(final StorageResourceId resourceId, final CreateOptions options)
|
|
|
throws IOException {
|
|
|
LOG.trace("create({})", resourceId);
|
|
|
|
|
|
@@ -104,7 +129,7 @@ class GoogleCloudStorage {
|
|
|
throw new FileAlreadyExistsException(String.format("Object %s already exists.", resourceId));
|
|
|
}
|
|
|
|
|
|
- public void close() {
|
|
|
+ void close() {
|
|
|
try {
|
|
|
storage.close();
|
|
|
} catch (Exception e) {
|
|
|
@@ -112,7 +137,7 @@ class GoogleCloudStorage {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) throws IOException {
|
|
|
+ GoogleCloudStorageItemInfo getItemInfo(StorageResourceId resourceId) throws IOException {
|
|
|
LOG.trace("getItemInfo({})", resourceId);
|
|
|
|
|
|
// Handle ROOT case first.
|
|
|
@@ -258,4 +283,350 @@ class GoogleCloudStorage {
|
|
|
bucket.getLocation(),
|
|
|
bucket.getStorageClass() == null ? null : bucket.getStorageClass().name());
|
|
|
}
|
|
|
+
|
|
|
+ List<GoogleCloudStorageItemInfo> listObjectInfo(
|
|
|
+ String bucketName,
|
|
|
+ String objectNamePrefix,
|
|
|
+ ListObjectOptions listOptions) throws IOException {
|
|
|
+ try {
|
|
|
+ long maxResults = listOptions.getMaxResults() > 0 ?
|
|
|
+ listOptions.getMaxResults() + (listOptions.isIncludePrefix() ? 0 : 1) :
|
|
|
+ listOptions.getMaxResults();
|
|
|
+
|
|
|
+ Storage.BlobListOption[] blobListOptions =
|
|
|
+ getBlobListOptions(objectNamePrefix, listOptions, maxResults);
|
|
|
+ Page<Blob> blobs = storage.list(bucketName, blobListOptions);
|
|
|
+ ListOperationResult result = new ListOperationResult(maxResults);
|
|
|
+ for (Blob blob : blobs.iterateAll()) {
|
|
|
+ result.add(blob);
|
|
|
+ }
|
|
|
+
|
|
|
+ return result.getItems();
|
|
|
+ } catch (StorageException e) {
|
|
|
+ throw new IOException(
|
|
|
+ String.format("listing object '%s' failed.", BlobId.of(bucketName, objectNamePrefix)),
|
|
|
+ e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Storage.BlobListOption[] getBlobListOptions(
|
|
|
+ String objectNamePrefix, ListObjectOptions listOptions, long maxResults) {
|
|
|
+ List<Storage.BlobListOption> options = new ArrayList<>();
|
|
|
+
|
|
|
+ options.add(Storage.BlobListOption.fields(BLOB_FIELDS.toArray(new Storage.BlobField[0])));
|
|
|
+ options.add(Storage.BlobListOption.prefix(objectNamePrefix));
|
|
|
+ // TODO: set max results as a BlobListOption
|
|
|
+ if ("/".equals(listOptions.getDelimiter())) {
|
|
|
+ options.add(Storage.BlobListOption.currentDirectory());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (listOptions.getDelimiter() != null) {
|
|
|
+ options.add(Storage.BlobListOption.includeTrailingDelimiter());
|
|
|
+ }
|
|
|
+
|
|
|
+ return options.toArray(new Storage.BlobListOption[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ private GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) {
|
|
|
+ long generationId = blob.getGeneration() == null ? 0L : blob.getGeneration();
|
|
|
+ StorageResourceId resourceId =
|
|
|
+ new StorageResourceId(blob.getBucket(), blob.getName(), generationId);
|
|
|
+ return createItemInfoForBlob(resourceId, blob);
|
|
|
+ }
|
|
|
+
|
|
|
+ void createBucket(String bucketName, CreateBucketOptions options) throws IOException {
|
|
|
+ LOG.trace("createBucket({})", bucketName);
|
|
|
+ checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or empty");
|
|
|
+ checkNotNull(options, "options must not be null");
|
|
|
+
|
|
|
+ BucketInfo.Builder bucketInfoBuilder =
|
|
|
+ BucketInfo.newBuilder(bucketName).setLocation(options.getLocation());
|
|
|
+
|
|
|
+ if (options.getStorageClass() != null) {
|
|
|
+ bucketInfoBuilder.setStorageClass(
|
|
|
+ StorageClass.valueOfStrict(options.getStorageClass().toUpperCase()));
|
|
|
+ }
|
|
|
+ if (options.getTtl() != null) {
|
|
|
+ bucketInfoBuilder.setLifecycleRules(
|
|
|
+ Collections.singletonList(
|
|
|
+ new BucketInfo.LifecycleRule(
|
|
|
+ BucketInfo.LifecycleRule.LifecycleAction.newDeleteAction(),
|
|
|
+ BucketInfo.LifecycleRule.LifecycleCondition.newBuilder()
|
|
|
+ .setAge(toIntExact(options.getTtl().toDays()))
|
|
|
+ .build())));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ storage.create(bucketInfoBuilder.build());
|
|
|
+ } catch (StorageException e) {
|
|
|
+ if (ErrorTypeExtractor.bucketAlreadyExists(e)) {
|
|
|
+ throw (FileAlreadyExistsException)
|
|
|
+ new FileAlreadyExistsException(String.format("Bucket '%s' already exists.", bucketName))
|
|
|
+ .initCause(e);
|
|
|
+ }
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void createEmptyObject(StorageResourceId resourceId) throws IOException {
|
|
|
+ LOG.trace("createEmptyObject({})", resourceId);
|
|
|
+ checkArgument(
|
|
|
+ resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
|
|
|
+ createEmptyObject(resourceId, EMPTY_OBJECT_CREATE_OPTIONS);
|
|
|
+ }
|
|
|
+
|
|
|
+ void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options)
|
|
|
+ throws IOException {
|
|
|
+ checkArgument(
|
|
|
+ resourceId.isStorageObject(), "Expected full StorageObject id, got %s", resourceId);
|
|
|
+
|
|
|
+ try {
|
|
|
+ createEmptyObjectInternal(resourceId, options);
|
|
|
+ } catch (StorageException e) {
|
|
|
+ if (canIgnoreExceptionForEmptyObject(e, resourceId, options)) {
|
|
|
+ LOG.info(
|
|
|
+ "Ignoring exception of type {}; verified object already exists with desired state.",
|
|
|
+ e.getClass().getSimpleName());
|
|
|
+ LOG.trace("Ignored exception while creating empty object: {}", resourceId, e);
|
|
|
+ } else {
|
|
|
+ if (ErrorTypeExtractor.getErrorType(e) == ErrorTypeExtractor.ErrorType.ALREADY_EXISTS) {
|
|
|
+ throw (FileAlreadyExistsException)
|
|
|
+ new FileAlreadyExistsException(
|
|
|
+ String.format("Object '%s' already exists.", resourceId)
|
|
|
+ ).initCause(e);
|
|
|
+ }
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper to check whether an empty object already exists with the expected metadata specified in
|
|
|
+ * {@code options}, to be used to determine whether it's safe to ignore an exception that was
|
|
|
+ * thrown when trying to create the object, {@code exceptionOnCreate}.
|
|
|
+ */
|
|
|
+ private boolean canIgnoreExceptionForEmptyObject(
|
|
|
+ StorageException exceptionOnCreate, StorageResourceId resourceId, CreateObjectOptions options)
|
|
|
+ throws IOException {
|
|
|
+ ErrorTypeExtractor.ErrorType errorType = ErrorTypeExtractor.getErrorType(exceptionOnCreate);
|
|
|
+ if (shouldBackoff(resourceId, errorType)) {
|
|
|
+ GoogleCloudStorageItemInfo existingInfo;
|
|
|
+ Duration maxWaitTime = Duration.ofSeconds(3); // TODO: make this configurable
|
|
|
+
|
|
|
+ BackOff backOff =
|
|
|
+ !maxWaitTime.isZero() && !maxWaitTime.isNegative()
|
|
|
+ ? new ExponentialBackOff.Builder()
|
|
|
+ .setMaxElapsedTimeMillis(toIntExact(maxWaitTime.toMillis()))
|
|
|
+ .setMaxIntervalMillis(500)
|
|
|
+ .setInitialIntervalMillis(100)
|
|
|
+ .setMultiplier(1.5)
|
|
|
+ .setRandomizationFactor(0.15)
|
|
|
+ .build()
|
|
|
+ : BackOff.STOP_BACKOFF;
|
|
|
+ long nextSleep = 0L;
|
|
|
+ do {
|
|
|
+ if (nextSleep > 0) {
|
|
|
+ try {
|
|
|
+ Sleeper.DEFAULT.sleep(nextSleep);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ nextSleep = BackOff.STOP;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ existingInfo = getItemInfo(resourceId);
|
|
|
+ nextSleep = nextSleep == BackOff.STOP ? BackOff.STOP : backOff.nextBackOffMillis();
|
|
|
+ } while (!existingInfo.exists() && nextSleep != BackOff.STOP);
|
|
|
+
|
|
|
+ // Compare existence, size, and metadata; for 429 errors creating an empty object,
|
|
|
+ // we don't care about metaGeneration/contentGeneration as long as the metadata
|
|
|
+ // matches, since we don't know for sure whether our low-level request succeeded
|
|
|
+ // first or some other client succeeded first.
|
|
|
+ if (existingInfo.exists() && existingInfo.getSize() == 0) {
|
|
|
+ if (options.isEnsureEmptyObjectsMetadataMatch()) {
|
|
|
+ return existingInfo.metadataEquals(options.getMetadata());
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static boolean shouldBackoff(StorageResourceId resourceId,
|
|
|
+ ErrorTypeExtractor.ErrorType errorType) {
|
|
|
+ return errorType == ErrorTypeExtractor.ErrorType.RESOURCE_EXHAUSTED
|
|
|
+ || errorType == ErrorTypeExtractor.ErrorType.INTERNAL ||
|
|
|
+ (resourceId.isDirectory() && errorType == ErrorTypeExtractor.ErrorType.FAILED_PRECONDITION);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createEmptyObjectInternal(
|
|
|
+ StorageResourceId resourceId, CreateObjectOptions createObjectOptions) throws IOException {
|
|
|
+ Map<String, String> rewrittenMetadata = encodeMetadata(createObjectOptions.getMetadata());
|
|
|
+
|
|
|
+ List<Storage.BlobTargetOption> blobTargetOptions = new ArrayList<>();
|
|
|
+ blobTargetOptions.add(Storage.BlobTargetOption.disableGzipContent());
|
|
|
+ if (resourceId.hasGenerationId()) {
|
|
|
+ blobTargetOptions.add(Storage.BlobTargetOption.generationMatch(resourceId.getGenerationId()));
|
|
|
+ } else if (resourceId.isDirectory() || !createObjectOptions.isOverwriteExisting()) {
|
|
|
+ blobTargetOptions.add(Storage.BlobTargetOption.doesNotExist());
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // TODO: Set encryption key and related properties
|
|
|
+ storage.create(
|
|
|
+ BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName()))
|
|
|
+ .setMetadata(rewrittenMetadata)
|
|
|
+ .setContentEncoding(createObjectOptions.getContentEncoding())
|
|
|
+ .setContentType(createObjectOptions.getContentType())
|
|
|
+ .build(),
|
|
|
+ blobTargetOptions.toArray(new Storage.BlobTargetOption[0]));
|
|
|
+ } catch (StorageException e) {
|
|
|
+ throw new IOException(String.format("Creating empty object %s failed.", resourceId), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
|
|
|
+ return Maps.transformValues(metadata, GoogleCloudStorage::encodeMetadataValues);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String encodeMetadataValues(byte[] bytes) {
|
|
|
+ return bytes == null ? null : BaseEncoding.base64().encode(bytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ List<GoogleCloudStorageItemInfo> listDirectoryRecursive(String bucketName, String objectName)
|
|
|
+ throws IOException {
|
|
|
+ // TODO: Take delimiter from config
|
|
|
+ // TODO: Set specific fields
|
|
|
+
|
|
|
+ try {
|
|
|
+ Page<Blob> blobs = storage.list(
|
|
|
+ bucketName,
|
|
|
+ Storage.BlobListOption.prefix(objectName));
|
|
|
+
|
|
|
+ List<GoogleCloudStorageItemInfo> result = new ArrayList<>();
|
|
|
+ for (Blob blob : blobs.iterateAll()) {
|
|
|
+ result.add(createItemInfoForBlob(blob));
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+ } catch (StorageException e) {
|
|
|
+ throw new IOException(
|
|
|
+ String.format("Listing '%s' failed", BlobId.of(bucketName, objectName)), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void deleteObjects(List<StorageResourceId> fullObjectNames) throws IOException {
|
|
|
+ LOG.trace("deleteObjects({})", fullObjectNames);
|
|
|
+
|
|
|
+ if (fullObjectNames.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Validate that all the elements represent StorageObjects.
|
|
|
+ for (StorageResourceId toDelete : fullObjectNames) {
|
|
|
+ checkArgument(
|
|
|
+ toDelete.isStorageObject(),
|
|
|
+ "Expected full StorageObject names only, got: %s",
|
|
|
+ toDelete);
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: Do this concurrently
|
|
|
+ // TODO: There is duplication. fix it
|
|
|
+ for (StorageResourceId toDelete : fullObjectNames) {
|
|
|
+ try {
|
|
|
+ LOG.trace("Deleting Object ({})", toDelete);
|
|
|
+ if (toDelete.hasGenerationId() && toDelete.getGenerationId() != 0) {
|
|
|
+ storage.delete(
|
|
|
+ BlobId.of(toDelete.getBucketName(), toDelete.getObjectName()),
|
|
|
+ Storage.BlobSourceOption.generationMatch(toDelete.getGenerationId()));
|
|
|
+ } else {
|
|
|
+ // TODO: Remove delete without generationId
|
|
|
+ storage.delete(BlobId.of(toDelete.getBucketName(), toDelete.getObjectName()));
|
|
|
+
|
|
|
+ LOG.trace("Deleting Object without generationId ({})", toDelete);
|
|
|
+ }
|
|
|
+ } catch (StorageException e) {
|
|
|
+ throw new IOException(String.format("Deleting resource %s failed.", toDelete), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ List<GoogleCloudStorageItemInfo> listBucketInfo() throws IOException {
|
|
|
+ List<Bucket> allBuckets = listBucketsInternal();
|
|
|
+ List<GoogleCloudStorageItemInfo> bucketInfos = new ArrayList<>(allBuckets.size());
|
|
|
+ for (Bucket bucket : allBuckets) {
|
|
|
+ bucketInfos.add(createItemInfoForBucket(new StorageResourceId(bucket.getName()), bucket));
|
|
|
+ }
|
|
|
+ return bucketInfos;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private List<Bucket> listBucketsInternal() throws IOException {
|
|
|
+ checkNotNull(configuration.getProjectId(), "projectId must not be null");
|
|
|
+ List<Bucket> allBuckets = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ Page<Bucket> buckets =
|
|
|
+ storage.list(
|
|
|
+ Storage.BucketListOption.pageSize(configuration.getMaxListItemsPerCall()),
|
|
|
+ Storage.BucketListOption.fields(
|
|
|
+ Storage.BucketField.LOCATION,
|
|
|
+ Storage.BucketField.STORAGE_CLASS,
|
|
|
+ Storage.BucketField.TIME_CREATED,
|
|
|
+ Storage.BucketField.UPDATED));
|
|
|
+
|
|
|
+ // Loop to fetch all the items.
|
|
|
+ for (Bucket bucket : buckets.iterateAll()) {
|
|
|
+ allBuckets.add(bucket);
|
|
|
+ }
|
|
|
+ } catch (StorageException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ return allBuckets;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Helper class to capture the results of list operation.
|
|
|
+ private class ListOperationResult {
|
|
|
+ private final Map<String, Blob> prefixes = new HashMap<>();
|
|
|
+ private final List<Blob> objects = new ArrayList<>();
|
|
|
+
|
|
|
+ private final Set<String> objectsSet = new HashSet<>();
|
|
|
+
|
|
|
+ private final long maxResults;
|
|
|
+
|
|
|
+ ListOperationResult(long maxResults) {
|
|
|
+ this.maxResults = maxResults;
|
|
|
+ }
|
|
|
+
|
|
|
+ void add(Blob blob) {
|
|
|
+ String path = blob.getBlobId().toGsUtilUri();
|
|
|
+ if (blob.getGeneration() != null) {
|
|
|
+ prefixes.remove(path);
|
|
|
+ objects.add(blob);
|
|
|
+
|
|
|
+ objectsSet.add(path);
|
|
|
+ } else if (!objectsSet.contains(path)) {
|
|
|
+ prefixes.put(path, blob);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ List<GoogleCloudStorageItemInfo> getItems() {
|
|
|
+ List<GoogleCloudStorageItemInfo> result = new ArrayList<>(prefixes.size() + objects.size());
|
|
|
+
|
|
|
+ for (Blob blob : objects) {
|
|
|
+ result.add(createItemInfoForBlob(blob));
|
|
|
+
|
|
|
+ if (result.size() == maxResults) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Blob blob : prefixes.values()) {
|
|
|
+ if (result.size() == maxResults) {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ result.add(createItemInfoForBlob(blob));
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|