|
@@ -15,15 +15,16 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
+package org.apache.hadoop.fs.azurebfs;
|
|
|
|
|
|
-package org.apache.hadoop.fs.azurebfs.services;
|
|
|
-
|
|
|
-import javax.xml.bind.DatatypeConverter;
|
|
|
import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.OutputStream;
|
|
|
+import java.net.MalformedURLException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.net.URL;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.CharBuffer;
|
|
|
import java.nio.charset.CharacterCodingException;
|
|
@@ -32,89 +33,110 @@ import java.nio.charset.CharsetDecoder;
|
|
|
import java.nio.charset.CharsetEncoder;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.Set;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Hashtable;
|
|
|
import java.util.Map;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.Set;
|
|
|
+import javax.xml.bind.DatatypeConverter;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
-import com.google.inject.Inject;
|
|
|
-import com.google.inject.Singleton;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
|
|
|
-import org.joda.time.DateTime;
|
|
|
-import org.joda.time.format.DateTimeFormat;
|
|
|
|
|
|
-import org.apache.hadoop.fs.FileStatus;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
-
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
|
|
+import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
|
|
|
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpClientFactory;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
|
|
|
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
|
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
|
|
|
-import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
|
-
|
|
|
-import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
-import org.apache.hadoop.classification.InterfaceStability;
|
|
|
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
|
|
-import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsConfiguration;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
-
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.http.client.utils.URIBuilder;
|
|
|
+import org.joda.time.DateTime;
|
|
|
+import org.joda.time.format.DateTimeFormat;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import static org.apache.hadoop.util.Time.now;
|
|
|
|
|
|
-@Singleton
|
|
|
-@InterfaceAudience.Private
|
|
|
+/**
|
|
|
+ * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
|
|
|
+ */
|
|
|
+@InterfaceAudience.Public
|
|
|
@InterfaceStability.Evolving
|
|
|
-final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
- public static final Logger LOG = LoggerFactory.getLogger(AbfsHttpService.class);
|
|
|
+public class AzureBlobFileSystemStore {
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
|
|
|
+
|
|
|
+ private AbfsClient client;
|
|
|
+ private URI uri;
|
|
|
+ private final UserGroupInformation userGroupInformation;
|
|
|
private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'";
|
|
|
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
|
|
|
private static final int LIST_MAX_RESULTS = 5000;
|
|
|
private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000;
|
|
|
private static final int RENAME_TIMEOUT_MILISECONDS = 180000;
|
|
|
|
|
|
- private final AbfsHttpClientFactory abfsHttpClientFactory;
|
|
|
- private final ConcurrentHashMap<AzureBlobFileSystem, AbfsClient> clientCache;
|
|
|
- private final ConfigurationService configurationService;
|
|
|
+ private final AbfsConfiguration abfsConfiguration;
|
|
|
private final Set<String> azureAtomicRenameDirSet;
|
|
|
|
|
|
- @Inject
|
|
|
- AbfsHttpServiceImpl(
|
|
|
- final ConfigurationService configurationService,
|
|
|
- final AbfsHttpClientFactory abfsHttpClientFactory,
|
|
|
- final TracingService tracingService) {
|
|
|
- Preconditions.checkNotNull(abfsHttpClientFactory, "abfsHttpClientFactory");
|
|
|
- Preconditions.checkNotNull(configurationService, "configurationService");
|
|
|
- Preconditions.checkNotNull(tracingService, "tracingService");
|
|
|
-
|
|
|
- this.configurationService = configurationService;
|
|
|
- this.clientCache = new ConcurrentHashMap<>();
|
|
|
- this.abfsHttpClientFactory = abfsHttpClientFactory;
|
|
|
- this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(configurationService.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
|
|
|
+
|
|
|
+ public AzureBlobFileSystemStore(URI uri, boolean isSeure, Configuration configuration, UserGroupInformation userGroupInformation)
|
|
|
+ throws AzureBlobFileSystemException {
|
|
|
+ this.uri = uri;
|
|
|
+ try {
|
|
|
+ this.abfsConfiguration = new AbfsConfiguration(configuration);
|
|
|
+ } catch (IllegalAccessException exception) {
|
|
|
+ throw new FileSystemOperationUnhandledException(exception);
|
|
|
+ }
|
|
|
+
|
|
|
+ this.userGroupInformation = userGroupInformation;
|
|
|
+ this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
|
|
|
+
|
|
|
+ initializeClient(uri, isSeure);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
|
|
|
+ String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
|
|
|
+
|
|
|
+ final URIBuilder uriBuilder = new URIBuilder();
|
|
|
+ uriBuilder.setScheme(scheme);
|
|
|
+ uriBuilder.setHost(hostName);
|
|
|
+
|
|
|
+ return uriBuilder;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Hashtable<String, String> getFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem)
|
|
|
- throws AzureBlobFileSystemException{
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
+ public AbfsConfiguration getAbfsConfiguration() {
|
|
|
+ return this.abfsConfiguration;
|
|
|
+ }
|
|
|
|
|
|
+ public Hashtable<String, String> getFilesystemProperties() throws AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "getFilesystemProperties for filesystem: {}",
|
|
|
- client.getFileSystem());
|
|
|
+ "getFilesystemProperties for filesystem: {}",
|
|
|
+ client.getFileSystem());
|
|
|
|
|
|
final Hashtable<String, String> parsedXmsProperties;
|
|
|
|
|
@@ -126,19 +148,15 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
return parsedXmsProperties;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void setFilesystemProperties(final AzureBlobFileSystem azureBlobFileSystem, final Hashtable<String, String> properties) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
+ public void setFilesystemProperties(final Hashtable<String, String> properties) throws AzureBlobFileSystemException {
|
|
|
if (properties == null || properties.size() == 0) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
this.LOG.debug(
|
|
|
- "setFilesystemProperties for filesystem: {} with properties: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- properties);
|
|
|
+ "setFilesystemProperties for filesystem: {} with properties: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ properties);
|
|
|
|
|
|
final String commaSeparatedProperties;
|
|
|
try {
|
|
@@ -146,18 +164,15 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
} catch (CharacterCodingException ex) {
|
|
|
throw new InvalidAbfsRestOperationException(ex);
|
|
|
}
|
|
|
+
|
|
|
client.setFilesystemProperties(commaSeparatedProperties);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Hashtable<String, String> getPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public Hashtable<String, String> getPathProperties(final Path path) throws AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "getPathProperties for filesystem: {} path: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString());
|
|
|
+ "getPathProperties for filesystem: {} path: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString());
|
|
|
|
|
|
final Hashtable<String, String> parsedXmsProperties;
|
|
|
final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
|
|
@@ -169,17 +184,12 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
return parsedXmsProperties;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void setPathProperties(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final Hashtable<String,
|
|
|
- String> properties) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public void setPathProperties(final Path path, final Hashtable<String, String> properties) throws AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "setFilesystemProperties for filesystem: {} path: {} with properties: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString(),
|
|
|
- properties);
|
|
|
+ "setFilesystemProperties for filesystem: {} path: {} with properties: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString(),
|
|
|
+ properties);
|
|
|
|
|
|
final String commaSeparatedProperties;
|
|
|
try {
|
|
@@ -190,71 +200,55 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void createFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public void createFilesystem() throws AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "createFilesystem for filesystem: {}",
|
|
|
- client.getFileSystem());
|
|
|
+ "createFilesystem for filesystem: {}",
|
|
|
+ client.getFileSystem());
|
|
|
|
|
|
client.createFilesystem();
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void deleteFilesystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public void deleteFilesystem() throws AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "deleteFilesystem for filesystem: {}",
|
|
|
- client.getFileSystem());
|
|
|
+ "deleteFilesystem for filesystem: {}",
|
|
|
+ client.getFileSystem());
|
|
|
|
|
|
client.deleteFilesystem();
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public OutputStream createFile(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public OutputStream createFile(final Path path, final boolean overwrite) throws AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "createFile filesystem: {} path: {} overwrite: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString(),
|
|
|
- overwrite);
|
|
|
+ "createFile filesystem: {} path: {} overwrite: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString(),
|
|
|
+ overwrite);
|
|
|
|
|
|
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite);
|
|
|
|
|
|
final OutputStream outputStream;
|
|
|
outputStream = new FSDataOutputStream(
|
|
|
- new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0,
|
|
|
- configurationService.getWriteBufferSize()), null);
|
|
|
+ new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0,
|
|
|
+ abfsConfiguration.getWriteBufferSize()), null);
|
|
|
return outputStream;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Void createDirectory(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public Void createDirectory(final Path path) throws AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "createDirectory filesystem: {} path: {} overwrite: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString());
|
|
|
+ "createDirectory filesystem: {} path: {} overwrite: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString());
|
|
|
|
|
|
client.createPath("/" + getRelativePath(path), false, true);
|
|
|
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public InputStream openFileForRead(final AzureBlobFileSystem azureBlobFileSystem, final Path path,
|
|
|
- final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
|
|
|
+ public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException {
|
|
|
|
|
|
this.LOG.debug(
|
|
|
- "openFileForRead filesystem: {} path: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString());
|
|
|
+ "openFileForRead filesystem: {} path: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString());
|
|
|
|
|
|
final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
|
|
|
|
|
@@ -264,28 +258,25 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
|
|
|
if (parseIsDirectory(resourceType)) {
|
|
|
throw new AbfsRestOperationException(
|
|
|
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
|
|
|
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
|
|
|
- "openFileForRead must be used with files and not directories",
|
|
|
- null);
|
|
|
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
|
|
|
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
|
|
|
+ "openFileForRead must be used with files and not directories",
|
|
|
+ null);
|
|
|
}
|
|
|
|
|
|
// Add statistics for InputStream
|
|
|
return new FSDataInputStream(
|
|
|
- new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
|
|
|
- configurationService.getReadBufferSize(), configurationService.getReadAheadQueueDepth(), eTag));
|
|
|
+ new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
|
|
|
+ abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag));
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public OutputStream openFileForWrite(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean overwrite) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
|
|
|
+ AzureBlobFileSystemException {
|
|
|
this.LOG.debug(
|
|
|
- "openFileForWrite filesystem: {} path: {} overwrite: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString(),
|
|
|
- overwrite);
|
|
|
+ "openFileForWrite filesystem: {} path: {} overwrite: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString(),
|
|
|
+ overwrite);
|
|
|
|
|
|
final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
|
|
|
|
|
@@ -294,37 +285,34 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
|
|
|
if (parseIsDirectory(resourceType)) {
|
|
|
throw new AbfsRestOperationException(
|
|
|
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
|
|
|
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
|
|
|
- "openFileForRead must be used with files and not directories",
|
|
|
- null);
|
|
|
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
|
|
|
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
|
|
|
+ "openFileForRead must be used with files and not directories",
|
|
|
+ null);
|
|
|
}
|
|
|
|
|
|
final long offset = overwrite ? 0 : contentLength;
|
|
|
|
|
|
final OutputStream outputStream;
|
|
|
outputStream = new FSDataOutputStream(
|
|
|
- new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
|
|
|
- offset, configurationService.getWriteBufferSize()), null);
|
|
|
+ new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
|
|
|
+ offset, abfsConfiguration.getWriteBufferSize()), null);
|
|
|
return outputStream;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void rename(final AzureBlobFileSystem azureBlobFileSystem, final Path source, final Path destination) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
+ public void rename(final Path source, final Path destination) throws
|
|
|
+ AzureBlobFileSystemException {
|
|
|
|
|
|
if (isAtomicRenameKey(source.getName())) {
|
|
|
this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename,"
|
|
|
- +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
|
|
|
+ +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account.");
|
|
|
}
|
|
|
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
this.LOG.debug(
|
|
|
- "renameAsync filesystem: {} source: {} destination: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- source.toString(),
|
|
|
- destination.toString());
|
|
|
+ "renameAsync filesystem: {} source: {} destination: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ source.toString(),
|
|
|
+ destination.toString());
|
|
|
|
|
|
String continuation = null;
|
|
|
long deadline = now() + RENAME_TIMEOUT_MILISECONDS;
|
|
@@ -332,30 +320,28 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
do {
|
|
|
if (now() > deadline) {
|
|
|
LOG.debug(
|
|
|
- "Rename {} to {} timed out.",
|
|
|
- source,
|
|
|
- destination);
|
|
|
+ "Rename {} to {} timed out.",
|
|
|
+ source,
|
|
|
+ destination);
|
|
|
|
|
|
throw new TimeoutException("Rename timed out.");
|
|
|
}
|
|
|
|
|
|
AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
|
|
|
- AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
|
|
|
+ AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation);
|
|
|
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
|
|
|
|
|
|
} while (continuation != null && !continuation.isEmpty());
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void delete(final AzureBlobFileSystem azureBlobFileSystem, final Path path, final boolean recursive) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
+ public void delete(final Path path, final boolean recursive) throws
|
|
|
+ AzureBlobFileSystemException {
|
|
|
|
|
|
this.LOG.debug(
|
|
|
- "delete filesystem: {} path: {} recursive: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString(),
|
|
|
- String.valueOf(recursive));
|
|
|
+ "delete filesystem: {} path: {} recursive: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString(),
|
|
|
+ String.valueOf(recursive));
|
|
|
|
|
|
String continuation = null;
|
|
|
long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS;
|
|
@@ -363,7 +349,7 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
do {
|
|
|
if (now() > deadline) {
|
|
|
this.LOG.debug(
|
|
|
- "Delete directory {} timed out.", path);
|
|
|
+ "Delete directory {} timed out.", path);
|
|
|
|
|
|
throw new TimeoutException("Delete directory timed out.");
|
|
|
}
|
|
@@ -374,60 +360,55 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
} while (continuation != null && !continuation.isEmpty());
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public FileStatus getFileStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
+ public FileStatus getFileStatus(final Path path) throws IOException {
|
|
|
|
|
|
this.LOG.debug(
|
|
|
- "getFileStatus filesystem: {} path: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString());
|
|
|
+ "getFileStatus filesystem: {} path: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString());
|
|
|
|
|
|
if (path.isRoot()) {
|
|
|
AbfsRestOperation op = client.getFilesystemProperties();
|
|
|
- final long blockSize = configurationService.getAzureBlockSize();
|
|
|
+ final long blockSize = abfsConfiguration.getAzureBlockSize();
|
|
|
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
|
|
final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
|
|
|
return new VersionedFileStatus(
|
|
|
- azureBlobFileSystem.getOwnerUser(),
|
|
|
- azureBlobFileSystem.getOwnerUserPrimaryGroup(),
|
|
|
- 0,
|
|
|
- true,
|
|
|
- 1,
|
|
|
- blockSize,
|
|
|
- parseLastModifiedTime(lastModified).getMillis(),
|
|
|
- path,
|
|
|
- eTag);
|
|
|
+ userGroupInformation.getUserName(),
|
|
|
+ userGroupInformation.getPrimaryGroupName(),
|
|
|
+ 0,
|
|
|
+ true,
|
|
|
+ 1,
|
|
|
+ blockSize,
|
|
|
+ parseLastModifiedTime(lastModified).getMillis(),
|
|
|
+ path,
|
|
|
+ eTag);
|
|
|
} else {
|
|
|
AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
|
|
|
|
|
|
- final long blockSize = configurationService.getAzureBlockSize();
|
|
|
+ final long blockSize = abfsConfiguration.getAzureBlockSize();
|
|
|
final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
|
|
|
final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
|
|
|
final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH);
|
|
|
final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
|
|
|
|
|
|
return new VersionedFileStatus(
|
|
|
- azureBlobFileSystem.getOwnerUser(),
|
|
|
- azureBlobFileSystem.getOwnerUserPrimaryGroup(),
|
|
|
- parseContentLength(contentLength),
|
|
|
- parseIsDirectory(resourceType),
|
|
|
- 1,
|
|
|
- blockSize,
|
|
|
- parseLastModifiedTime(lastModified).getMillis(),
|
|
|
- path,
|
|
|
- eTag);
|
|
|
+ userGroupInformation.getUserName(),
|
|
|
+ userGroupInformation.getPrimaryGroupName(),
|
|
|
+ parseContentLength(contentLength),
|
|
|
+ parseIsDirectory(resourceType),
|
|
|
+ 1,
|
|
|
+ blockSize,
|
|
|
+ parseLastModifiedTime(lastModified).getMillis(),
|
|
|
+ path,
|
|
|
+ eTag);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public FileStatus[] listStatus(final AzureBlobFileSystem azureBlobFileSystem, final Path path) throws AzureBlobFileSystemException {
|
|
|
- final AbfsClient client = this.getOrCreateClient(azureBlobFileSystem);
|
|
|
-
|
|
|
+ public FileStatus[] listStatus(final Path path) throws IOException {
|
|
|
this.LOG.debug(
|
|
|
- "listStatus filesystem: {} path: {}",
|
|
|
- client.getFileSystem(),
|
|
|
- path.toString());
|
|
|
+ "listStatus filesystem: {} path: {}",
|
|
|
+ client.getFileSystem(),
|
|
|
+ path.toString());
|
|
|
|
|
|
String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path);
|
|
|
String continuation = null;
|
|
@@ -439,13 +420,13 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
|
|
|
if (retrievedSchema == null) {
|
|
|
throw new AbfsRestOperationException(
|
|
|
- AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
|
|
|
- AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
|
|
|
- "listStatusAsync path not found",
|
|
|
- null, op.getResult());
|
|
|
+ AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
|
|
|
+ AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
|
|
|
+ "listStatusAsync path not found",
|
|
|
+ null, op.getResult());
|
|
|
}
|
|
|
|
|
|
- long blockSize = configurationService.getAzureBlockSize();
|
|
|
+ long blockSize = abfsConfiguration.getAzureBlockSize();
|
|
|
|
|
|
for (ListResultEntrySchema entry : retrievedSchema.paths()) {
|
|
|
long lastModifiedMillis = 0;
|
|
@@ -453,22 +434,25 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
|
|
|
if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
|
|
|
final DateTime dateTime = DateTime.parse(
|
|
|
- entry.lastModified(),
|
|
|
- DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
|
|
|
+ entry.lastModified(),
|
|
|
+ DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
|
|
|
lastModifiedMillis = dateTime.getMillis();
|
|
|
}
|
|
|
|
|
|
+ Path entryPath = new Path(File.separator + entry.name());
|
|
|
+ entryPath = entryPath.makeQualified(this.uri, entryPath);
|
|
|
+
|
|
|
fileStatuses.add(
|
|
|
- new VersionedFileStatus(
|
|
|
- azureBlobFileSystem.getOwnerUser(),
|
|
|
- azureBlobFileSystem.getOwnerUserPrimaryGroup(),
|
|
|
- contentLength,
|
|
|
- isDirectory,
|
|
|
- 1,
|
|
|
- blockSize,
|
|
|
- lastModifiedMillis,
|
|
|
- azureBlobFileSystem.makeQualified(new Path(File.separator + entry.name())),
|
|
|
- entry.eTag()));
|
|
|
+ new VersionedFileStatus(
|
|
|
+ userGroupInformation.getUserName(),
|
|
|
+ userGroupInformation.getPrimaryGroupName(),
|
|
|
+ contentLength,
|
|
|
+ isDirectory,
|
|
|
+ 1,
|
|
|
+ blockSize,
|
|
|
+ lastModifiedMillis,
|
|
|
+ entryPath,
|
|
|
+ entry.eTag()));
|
|
|
}
|
|
|
|
|
|
} while (continuation != null && !continuation.isEmpty());
|
|
@@ -476,16 +460,55 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
return fileStatuses.toArray(new FileStatus[0]);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public synchronized void closeFileSystem(final AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException {
|
|
|
- this.clientCache.remove(azureBlobFileSystem);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
public boolean isAtomicRenameKey(String key) {
|
|
|
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
|
|
|
}
|
|
|
|
|
|
+ private void initializeClient(URI uri, boolean isSeure) throws AzureBlobFileSystemException {
|
|
|
+ if (this.client != null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ final String authority = uri.getRawAuthority();
|
|
|
+ if (null == authority) {
|
|
|
+ throw new InvalidUriAuthorityException(uri.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
|
|
|
+ throw new InvalidUriAuthorityException(uri.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
|
|
|
+
|
|
|
+ if (authorityParts.length < 2 || "".equals(authorityParts[0])) {
|
|
|
+ final String errMsg = String
|
|
|
+ .format("URI '%s' has a malformed authority, expected container name. "
|
|
|
+ + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
|
|
|
+ uri.toString());
|
|
|
+ throw new InvalidUriException(errMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ final String fileSystemName = authorityParts[0];
|
|
|
+ final String accountName = authorityParts[1];
|
|
|
+
|
|
|
+ final URIBuilder uriBuilder = getURIBuilder(accountName, isSeure);
|
|
|
+
|
|
|
+ final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
|
|
|
+
|
|
|
+ URL baseUrl;
|
|
|
+ try {
|
|
|
+ baseUrl = new URL(url);
|
|
|
+ } catch (MalformedURLException e) {
|
|
|
+ throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString()));
|
|
|
+ }
|
|
|
+
|
|
|
+ SharedKeyCredentials creds =
|
|
|
+ new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)),
|
|
|
+ this.abfsConfiguration.getStorageAccountKey(accountName));
|
|
|
+
|
|
|
+ this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy());
|
|
|
+ }
|
|
|
+
|
|
|
private String getRelativePath(final Path path) {
|
|
|
Preconditions.checkNotNull(path, "path");
|
|
|
final String relativePath = path.toUri().getPath();
|
|
@@ -505,23 +528,6 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
return relativePath;
|
|
|
}
|
|
|
|
|
|
- private synchronized AbfsClient getOrCreateClient(final AzureBlobFileSystem azureBlobFileSystem) throws
|
|
|
- AzureBlobFileSystemException {
|
|
|
- Preconditions.checkNotNull(azureBlobFileSystem, "azureBlobFileSystem");
|
|
|
-
|
|
|
- AbfsClient client = this.clientCache.get(azureBlobFileSystem);
|
|
|
-
|
|
|
- if (client != null) {
|
|
|
- return client;
|
|
|
- }
|
|
|
-
|
|
|
- client = abfsHttpClientFactory.create(azureBlobFileSystem);
|
|
|
- this.clientCache.put(
|
|
|
- azureBlobFileSystem,
|
|
|
- client);
|
|
|
- return client;
|
|
|
- }
|
|
|
-
|
|
|
private long parseContentLength(final String contentLength) {
|
|
|
if (contentLength == null) {
|
|
|
return -1;
|
|
@@ -536,12 +542,12 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
|
|
|
private DateTime parseLastModifiedTime(final String lastModifiedTime) {
|
|
|
return DateTime.parse(
|
|
|
- lastModifiedTime,
|
|
|
- DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
|
|
|
+ lastModifiedTime,
|
|
|
+ DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC());
|
|
|
}
|
|
|
|
|
|
private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
|
|
|
- CharacterCodingException {
|
|
|
+ CharacterCodingException {
|
|
|
StringBuilder commaSeparatedProperties = new StringBuilder();
|
|
|
|
|
|
final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();
|
|
@@ -571,7 +577,7 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
}
|
|
|
|
|
|
private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
|
|
|
- InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
|
|
|
+ InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
|
|
|
Hashtable<String, String> properties = new Hashtable<>();
|
|
|
|
|
|
final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder();
|
|
@@ -633,15 +639,15 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
private final String version;
|
|
|
|
|
|
VersionedFileStatus(
|
|
|
- final String owner, final String group,
|
|
|
- final long length, final boolean isdir, final int blockReplication,
|
|
|
- final long blocksize, final long modificationTime, final Path path,
|
|
|
- String version) {
|
|
|
+ final String owner, final String group,
|
|
|
+ final long length, final boolean isdir, final int blockReplication,
|
|
|
+ final long blocksize, final long modificationTime, final Path path,
|
|
|
+ String version) {
|
|
|
super(length, isdir, blockReplication, blocksize, modificationTime, 0,
|
|
|
- new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
|
|
|
- owner,
|
|
|
- group,
|
|
|
- path);
|
|
|
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL),
|
|
|
+ owner,
|
|
|
+ group,
|
|
|
+ path);
|
|
|
|
|
|
this.version = version;
|
|
|
}
|
|
@@ -690,4 +696,6 @@ final class AbfsHttpServiceImpl implements AbfsHttpService {
|
|
|
return this.version;
|
|
|
}
|
|
|
}
|
|
|
-}
|
|
|
+
|
|
|
+
|
|
|
+}
|