|
@@ -30,6 +30,7 @@ import java.util.Hashtable;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
|
|
+import java.util.Map;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -68,6 +69,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
|
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
|
import org.apache.hadoop.fs.permission.AclEntry;
|
|
import org.apache.hadoop.fs.permission.AclEntry;
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
@@ -78,6 +80,7 @@ import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
|
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -94,6 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
|
|
|
|
private boolean delegationTokenEnabled = false;
|
|
private boolean delegationTokenEnabled = false;
|
|
private AbfsDelegationTokenManager delegationTokenManager;
|
|
private AbfsDelegationTokenManager delegationTokenManager;
|
|
|
|
+ private AbfsCounters instrumentation;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void initialize(URI uri, Configuration configuration)
|
|
public void initialize(URI uri, Configuration configuration)
|
|
@@ -109,7 +113,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
LOG.trace("AzureBlobFileSystemStore init complete");
|
|
LOG.trace("AzureBlobFileSystemStore init complete");
|
|
|
|
|
|
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
|
|
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
|
|
-
|
|
|
|
|
|
+ instrumentation = new AbfsInstrumentation(uri);
|
|
this.setWorkingDirectory(this.getHomeDirectory());
|
|
this.setWorkingDirectory(this.getHomeDirectory());
|
|
|
|
|
|
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
|
|
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
|
|
@@ -146,6 +150,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
sb.append("uri=").append(uri);
|
|
sb.append("uri=").append(uri);
|
|
sb.append(", user='").append(abfsStore.getUser()).append('\'');
|
|
sb.append(", user='").append(abfsStore.getUser()).append('\'');
|
|
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
|
|
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
|
|
|
|
+ if (instrumentation != null) {
|
|
|
|
+ sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
|
|
|
|
+ "}", true));
|
|
|
|
+ sb.append("}");
|
|
|
|
+ }
|
|
sb.append('}');
|
|
sb.append('}');
|
|
return sb.toString();
|
|
return sb.toString();
|
|
}
|
|
}
|
|
@@ -162,7 +171,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
|
|
public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
|
|
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
|
|
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
|
|
-
|
|
|
|
|
|
+ statIncrement(CALL_OPEN);
|
|
Path qualifiedPath = makeQualified(path);
|
|
Path qualifiedPath = makeQualified(path);
|
|
|
|
|
|
try {
|
|
try {
|
|
@@ -183,6 +192,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
overwrite,
|
|
overwrite,
|
|
blockSize);
|
|
blockSize);
|
|
|
|
|
|
|
|
+ statIncrement(CALL_CREATE);
|
|
trailingPeriodCheck(f);
|
|
trailingPeriodCheck(f);
|
|
|
|
|
|
Path qualifiedPath = makeQualified(f);
|
|
Path qualifiedPath = makeQualified(f);
|
|
@@ -190,6 +200,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
try {
|
|
try {
|
|
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
|
|
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
|
|
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
|
|
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
|
|
|
|
+ statIncrement(FILES_CREATED);
|
|
return new FSDataOutputStream(outputStream, statistics);
|
|
return new FSDataOutputStream(outputStream, statistics);
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
checkException(f, ex);
|
|
checkException(f, ex);
|
|
@@ -203,6 +214,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
final boolean overwrite, final int bufferSize, final short replication, final long blockSize,
|
|
final boolean overwrite, final int bufferSize, final short replication, final long blockSize,
|
|
final Progressable progress) throws IOException {
|
|
final Progressable progress) throws IOException {
|
|
|
|
|
|
|
|
+ statIncrement(CALL_CREATE_NON_RECURSIVE);
|
|
final Path parent = f.getParent();
|
|
final Path parent = f.getParent();
|
|
final FileStatus parentFileStatus = tryGetFileStatus(parent);
|
|
final FileStatus parentFileStatus = tryGetFileStatus(parent);
|
|
|
|
|
|
@@ -246,7 +258,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
"AzureBlobFileSystem.append path: {} bufferSize: {}",
|
|
"AzureBlobFileSystem.append path: {} bufferSize: {}",
|
|
f.toString(),
|
|
f.toString(),
|
|
bufferSize);
|
|
bufferSize);
|
|
-
|
|
|
|
|
|
+ statIncrement(CALL_APPEND);
|
|
Path qualifiedPath = makeQualified(f);
|
|
Path qualifiedPath = makeQualified(f);
|
|
|
|
|
|
try {
|
|
try {
|
|
@@ -261,7 +273,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
public boolean rename(final Path src, final Path dst) throws IOException {
|
|
public boolean rename(final Path src, final Path dst) throws IOException {
|
|
LOG.debug(
|
|
LOG.debug(
|
|
"AzureBlobFileSystem.rename src: {} dst: {}", src.toString(), dst.toString());
|
|
"AzureBlobFileSystem.rename src: {} dst: {}", src.toString(), dst.toString());
|
|
-
|
|
|
|
|
|
+ statIncrement(CALL_RENAME);
|
|
trailingPeriodCheck(dst);
|
|
trailingPeriodCheck(dst);
|
|
|
|
|
|
Path parentFolder = src.getParent();
|
|
Path parentFolder = src.getParent();
|
|
@@ -328,7 +340,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
public boolean delete(final Path f, final boolean recursive) throws IOException {
|
|
public boolean delete(final Path f, final boolean recursive) throws IOException {
|
|
LOG.debug(
|
|
LOG.debug(
|
|
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
|
|
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
|
|
-
|
|
|
|
|
|
+ statIncrement(CALL_DELETE);
|
|
Path qualifiedPath = makeQualified(f);
|
|
Path qualifiedPath = makeQualified(f);
|
|
|
|
|
|
if (f.isRoot()) {
|
|
if (f.isRoot()) {
|
|
@@ -353,7 +365,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
public FileStatus[] listStatus(final Path f) throws IOException {
|
|
public FileStatus[] listStatus(final Path f) throws IOException {
|
|
LOG.debug(
|
|
LOG.debug(
|
|
"AzureBlobFileSystem.listStatus path: {}", f.toString());
|
|
"AzureBlobFileSystem.listStatus path: {}", f.toString());
|
|
-
|
|
|
|
|
|
+ statIncrement(CALL_LIST_STATUS);
|
|
Path qualifiedPath = makeQualified(f);
|
|
Path qualifiedPath = makeQualified(f);
|
|
|
|
|
|
try {
|
|
try {
|
|
@@ -365,6 +377,24 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Increment of an Abfs statistic.
|
|
|
|
+ *
|
|
|
|
+ * @param statistic AbfsStatistic that needs increment.
|
|
|
|
+ */
|
|
|
|
+ private void statIncrement(AbfsStatistic statistic) {
|
|
|
|
+ incrementStatistic(statistic);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Method for incrementing AbfsStatistic by a long value.
|
|
|
|
+ *
|
|
|
|
+ * @param statistic the Statistic to be incremented.
|
|
|
|
+ */
|
|
|
|
+ private void incrementStatistic(AbfsStatistic statistic) {
|
|
|
|
+ instrumentation.incrementCounter(statistic, 1);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Performs a check for (.) until root in the path to throw an exception.
|
|
* Performs a check for (.) until root in the path to throw an exception.
|
|
* The purpose is to differentiate between dir/dir1 and dir/dir1.
|
|
* The purpose is to differentiate between dir/dir1 and dir/dir1.
|
|
@@ -394,7 +424,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
|
|
public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
|
|
LOG.debug(
|
|
LOG.debug(
|
|
"AzureBlobFileSystem.mkdirs path: {} permissions: {}", f, permission);
|
|
"AzureBlobFileSystem.mkdirs path: {} permissions: {}", f, permission);
|
|
-
|
|
|
|
|
|
+ statIncrement(CALL_MKDIRS);
|
|
trailingPeriodCheck(f);
|
|
trailingPeriodCheck(f);
|
|
|
|
|
|
final Path parentFolder = f.getParent();
|
|
final Path parentFolder = f.getParent();
|
|
@@ -408,6 +438,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
try {
|
|
try {
|
|
abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission,
|
|
abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission,
|
|
FsPermission.getUMask(getConf()));
|
|
FsPermission.getUMask(getConf()));
|
|
|
|
+ statIncrement(DIRECTORIES_CREATED);
|
|
return true;
|
|
return true;
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS);
|
|
checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS);
|
|
@@ -425,12 +456,13 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
LOG.debug("AzureBlobFileSystem.close");
|
|
LOG.debug("AzureBlobFileSystem.close");
|
|
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager);
|
|
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager);
|
|
this.isClosed = true;
|
|
this.isClosed = true;
|
|
|
|
+ LOG.debug("Closing Abfs: " + toString());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public FileStatus getFileStatus(final Path f) throws IOException {
|
|
public FileStatus getFileStatus(final Path f) throws IOException {
|
|
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f);
|
|
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f);
|
|
-
|
|
|
|
|
|
+ statIncrement(CALL_GET_FILE_STATUS);
|
|
Path qualifiedPath = makeQualified(f);
|
|
Path qualifiedPath = makeQualified(f);
|
|
|
|
|
|
try {
|
|
try {
|
|
@@ -567,6 +599,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public Void call() throws Exception {
|
|
public Void call() throws Exception {
|
|
delete(fs.getPath(), fs.isDirectory());
|
|
delete(fs.getPath(), fs.isDirectory());
|
|
|
|
+ if (fs.isDirectory()) {
|
|
|
|
+ statIncrement(DIRECTORIES_DELETED);
|
|
|
|
+ } else {
|
|
|
|
+ statIncrement(FILES_DELETED);
|
|
|
|
+ }
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
});
|
|
});
|
|
@@ -930,11 +967,25 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Incrementing exists() calls from superclass for statistic collection.
|
|
|
|
+ *
|
|
|
|
+ * @param f source path.
|
|
|
|
+ * @return true if the path exists.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public boolean exists(Path f) throws IOException {
|
|
|
|
+ statIncrement(CALL_EXIST);
|
|
|
|
+ return super.exists(f);
|
|
|
|
+ }
|
|
|
|
+
|
|
private FileStatus tryGetFileStatus(final Path f) {
|
|
private FileStatus tryGetFileStatus(final Path f) {
|
|
try {
|
|
try {
|
|
return getFileStatus(f);
|
|
return getFileStatus(f);
|
|
} catch (IOException ex) {
|
|
} catch (IOException ex) {
|
|
LOG.debug("File not found {}", f);
|
|
LOG.debug("File not found {}", f);
|
|
|
|
+ statIncrement(ERROR_IGNORED);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -951,6 +1002,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
// there is not way to get the storage error code
|
|
// there is not way to get the storage error code
|
|
// workaround here is to check its status code.
|
|
// workaround here is to check its status code.
|
|
} catch (FileNotFoundException e) {
|
|
} catch (FileNotFoundException e) {
|
|
|
|
+ statIncrement(ERROR_IGNORED);
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1124,6 +1176,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
|
|
public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
|
|
|
|
+ statIncrement(CALL_GET_DELEGATION_TOKEN);
|
|
return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer)
|
|
return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer)
|
|
: super.getDelegationToken(renewer);
|
|
: super.getDelegationToken(renewer);
|
|
}
|
|
}
|
|
@@ -1186,6 +1239,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
return abfsStore.getIsNamespaceEnabled();
|
|
return abfsStore.getIsNamespaceEnabled();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ Map<String, Long> getInstrumentationMap() {
|
|
|
|
+ return instrumentation.toMap();
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public boolean hasPathCapability(final Path path, final String capability)
|
|
public boolean hasPathCapability(final Path path, final String capability)
|
|
throws IOException {
|
|
throws IOException {
|