|
@@ -48,8 +48,11 @@ import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystemLinkResolver;
|
|
import org.apache.hadoop.fs.FileSystemLinkResolver;
|
|
import org.apache.hadoop.fs.FsServerDefaults;
|
|
import org.apache.hadoop.fs.FsServerDefaults;
|
|
import org.apache.hadoop.fs.FsStatus;
|
|
import org.apache.hadoop.fs.FsStatus;
|
|
|
|
+import org.apache.hadoop.fs.GlobalStorageStatistics;
|
|
|
|
+import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
import org.apache.hadoop.fs.Options;
|
|
import org.apache.hadoop.fs.Options;
|
|
|
|
+import org.apache.hadoop.fs.StorageStatistics;
|
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -67,6 +70,7 @@ import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
|
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
|
|
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
|
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
|
@@ -96,7 +100,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
|
|
|
|
-
|
|
|
|
/****************************************************************
|
|
/****************************************************************
|
|
* Implementation of the abstract FileSystem for the DFS system.
|
|
* Implementation of the abstract FileSystem for the DFS system.
|
|
* This object is the way end-user code interacts with a Hadoop
|
|
* This object is the way end-user code interacts with a Hadoop
|
|
@@ -114,6 +117,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
DFSClient dfs;
|
|
DFSClient dfs;
|
|
private boolean verifyChecksum = true;
|
|
private boolean verifyChecksum = true;
|
|
|
|
|
|
|
|
+ private DFSOpsCountStatistics storageStatistics;
|
|
|
|
+
|
|
static{
|
|
static{
|
|
HdfsConfiguration.init();
|
|
HdfsConfiguration.init();
|
|
}
|
|
}
|
|
@@ -151,6 +156,15 @@ public class DistributedFileSystem extends FileSystem {
|
|
this.dfs = new DFSClient(uri, conf, statistics);
|
|
this.dfs = new DFSClient(uri, conf, statistics);
|
|
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
|
|
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
|
|
this.workingDir = getHomeDirectory();
|
|
this.workingDir = getHomeDirectory();
|
|
|
|
+
|
|
|
|
+ storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
|
|
|
|
+ .put(DFSOpsCountStatistics.NAME,
|
|
|
|
+ new StorageStatisticsProvider() {
|
|
|
|
+ @Override
|
|
|
|
+ public StorageStatistics provide() {
|
|
|
|
+ return new DFSOpsCountStatistics();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -215,6 +229,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public BlockLocation[] getFileBlockLocations(Path p,
|
|
public BlockLocation[] getFileBlockLocations(Path p,
|
|
final long start, final long len) throws IOException {
|
|
final long start, final long len) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
|
|
final Path absF = fixRelativePart(p);
|
|
final Path absF = fixRelativePart(p);
|
|
return new FileSystemLinkResolver<BlockLocation[]>() {
|
|
return new FileSystemLinkResolver<BlockLocation[]>() {
|
|
@Override
|
|
@Override
|
|
@@ -301,6 +316,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public FSDataInputStream open(Path f, final int bufferSize)
|
|
public FSDataInputStream open(Path f, final int bufferSize)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.OPEN);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<FSDataInputStream>() {
|
|
return new FileSystemLinkResolver<FSDataInputStream>() {
|
|
@Override
|
|
@Override
|
|
@@ -337,6 +353,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
|
|
public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
|
|
final int bufferSize, final Progressable progress) throws IOException {
|
|
final int bufferSize, final Progressable progress) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.APPEND);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
|
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
|
@Override
|
|
@Override
|
|
@@ -369,6 +386,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
final int bufferSize, final Progressable progress,
|
|
final int bufferSize, final Progressable progress,
|
|
final InetSocketAddress[] favoredNodes) throws IOException {
|
|
final InetSocketAddress[] favoredNodes) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.APPEND);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
|
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
|
@Override
|
|
@Override
|
|
@@ -412,6 +430,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
final Progressable progress, final InetSocketAddress[] favoredNodes)
|
|
final Progressable progress, final InetSocketAddress[] favoredNodes)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.CREATE);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
|
|
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
|
|
@Override
|
|
@Override
|
|
@@ -445,6 +464,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
final Progressable progress, final ChecksumOpt checksumOpt)
|
|
final Progressable progress, final ChecksumOpt checksumOpt)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.CREATE);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
|
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
|
@Override
|
|
@Override
|
|
@@ -469,6 +489,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
short replication, long blockSize, Progressable progress,
|
|
short replication, long blockSize, Progressable progress,
|
|
ChecksumOpt checksumOpt) throws IOException {
|
|
ChecksumOpt checksumOpt) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.PRIMITIVE_CREATE);
|
|
final DFSOutputStream dfsos = dfs.primitiveCreate(
|
|
final DFSOutputStream dfsos = dfs.primitiveCreate(
|
|
getPathName(fixRelativePart(f)),
|
|
getPathName(fixRelativePart(f)),
|
|
absolutePermission, flag, true, replication, blockSize,
|
|
absolutePermission, flag, true, replication, blockSize,
|
|
@@ -485,6 +506,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
final int bufferSize, final short replication, final long blockSize,
|
|
final int bufferSize, final short replication, final long blockSize,
|
|
final Progressable progress) throws IOException {
|
|
final Progressable progress) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
|
|
if (flag.contains(CreateFlag.OVERWRITE)) {
|
|
if (flag.contains(CreateFlag.OVERWRITE)) {
|
|
flag.add(CreateFlag.CREATE);
|
|
flag.add(CreateFlag.CREATE);
|
|
}
|
|
}
|
|
@@ -510,6 +532,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public boolean setReplication(Path src, final short replication)
|
|
public boolean setReplication(Path src, final short replication)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
|
|
Path absF = fixRelativePart(src);
|
|
Path absF = fixRelativePart(src);
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
@Override
|
|
@Override
|
|
@@ -534,6 +557,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public void setStoragePolicy(final Path src, final String policyName)
|
|
public void setStoragePolicy(final Path src, final String policyName)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
|
|
Path absF = fixRelativePart(src);
|
|
Path absF = fixRelativePart(src);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -553,6 +577,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
|
|
public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICY);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
|
|
|
|
return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
|
|
return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
|
|
@@ -583,6 +608,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
*/
|
|
*/
|
|
public long getBytesWithFutureGenerationStamps() throws IOException {
|
|
public long getBytesWithFutureGenerationStamps() throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_BYTES_WITH_FUTURE_GS);
|
|
return dfs.getBytesInFutureBlocks();
|
|
return dfs.getBytesInFutureBlocks();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -593,6 +619,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Deprecated
|
|
@Deprecated
|
|
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
|
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICIES);
|
|
return dfs.getStoragePolicies();
|
|
return dfs.getStoragePolicies();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -607,6 +634,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void concat(Path trg, Path [] psrcs) throws IOException {
|
|
public void concat(Path trg, Path [] psrcs) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.CONCAT);
|
|
// Make target absolute
|
|
// Make target absolute
|
|
Path absF = fixRelativePart(trg);
|
|
Path absF = fixRelativePart(trg);
|
|
// Make all srcs absolute
|
|
// Make all srcs absolute
|
|
@@ -651,6 +679,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public boolean rename(Path src, Path dst) throws IOException {
|
|
public boolean rename(Path src, Path dst) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.RENAME);
|
|
|
|
|
|
final Path absSrc = fixRelativePart(src);
|
|
final Path absSrc = fixRelativePart(src);
|
|
final Path absDst = fixRelativePart(dst);
|
|
final Path absDst = fixRelativePart(dst);
|
|
@@ -685,6 +714,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public void rename(Path src, Path dst, final Options.Rename... options)
|
|
public void rename(Path src, Path dst, final Options.Rename... options)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.RENAME);
|
|
final Path absSrc = fixRelativePart(src);
|
|
final Path absSrc = fixRelativePart(src);
|
|
final Path absDst = fixRelativePart(dst);
|
|
final Path absDst = fixRelativePart(dst);
|
|
// Try the rename without resolving first
|
|
// Try the rename without resolving first
|
|
@@ -713,6 +743,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public boolean truncate(Path f, final long newLength) throws IOException {
|
|
public boolean truncate(Path f, final long newLength) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.TRUNCATE);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
@Override
|
|
@Override
|
|
@@ -730,6 +761,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public boolean delete(Path f, final boolean recursive) throws IOException {
|
|
public boolean delete(Path f, final boolean recursive) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.DELETE);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
@Override
|
|
@Override
|
|
@@ -747,6 +779,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public ContentSummary getContentSummary(Path f) throws IOException {
|
|
public ContentSummary getContentSummary(Path f) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<ContentSummary>() {
|
|
return new FileSystemLinkResolver<ContentSummary>() {
|
|
@Override
|
|
@Override
|
|
@@ -764,6 +797,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public QuotaUsage getQuotaUsage(Path f) throws IOException {
|
|
public QuotaUsage getQuotaUsage(Path f) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_QUOTA_USAGE);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<QuotaUsage>() {
|
|
return new FileSystemLinkResolver<QuotaUsage>() {
|
|
@Override
|
|
@Override
|
|
@@ -848,6 +882,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
stats[i] = partialListing[i].makeQualified(getUri(), p);
|
|
stats[i] = partialListing[i].makeQualified(getUri(), p);
|
|
}
|
|
}
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
|
|
return stats;
|
|
return stats;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -862,6 +897,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
listing.add(fileStatus.makeQualified(getUri(), p));
|
|
listing.add(fileStatus.makeQualified(getUri(), p));
|
|
}
|
|
}
|
|
statistics.incrementLargeReadOps(1);
|
|
statistics.incrementLargeReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
|
|
|
|
|
|
// now fetch more entries
|
|
// now fetch more entries
|
|
do {
|
|
do {
|
|
@@ -876,6 +912,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
listing.add(fileStatus.makeQualified(getUri(), p));
|
|
listing.add(fileStatus.makeQualified(getUri(), p));
|
|
}
|
|
}
|
|
statistics.incrementLargeReadOps(1);
|
|
statistics.incrementLargeReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
|
|
} while (thisListing.hasMore());
|
|
} while (thisListing.hasMore());
|
|
|
|
|
|
return listing.toArray(new FileStatus[listing.size()]);
|
|
return listing.toArray(new FileStatus[listing.size()]);
|
|
@@ -989,6 +1026,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
|
|
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
|
|
needLocation);
|
|
needLocation);
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS);
|
|
if (thisListing == null) { // the directory does not exist
|
|
if (thisListing == null) { // the directory does not exist
|
|
throw new FileNotFoundException("File " + p + " does not exist.");
|
|
throw new FileNotFoundException("File " + p + " does not exist.");
|
|
}
|
|
}
|
|
@@ -1084,6 +1122,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
private boolean mkdirsInternal(Path f, final FsPermission permission,
|
|
private boolean mkdirsInternal(Path f, final FsPermission permission,
|
|
final boolean createParent) throws IOException {
|
|
final boolean createParent) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.MKDIRS);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
return new FileSystemLinkResolver<Boolean>() {
|
|
@Override
|
|
@Override
|
|
@@ -1110,6 +1149,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
|
|
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.PRIMITIVE_MKDIR);
|
|
return dfs.primitiveMkdir(getPathName(f), absolutePermission);
|
|
return dfs.primitiveMkdir(getPathName(f), absolutePermission);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1155,6 +1195,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public FsStatus getStatus(Path p) throws IOException {
|
|
public FsStatus getStatus(Path p) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_STATUS);
|
|
return dfs.getDiskStatus();
|
|
return dfs.getDiskStatus();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1362,6 +1403,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public FileStatus getFileStatus(Path f) throws IOException {
|
|
public FileStatus getFileStatus(Path f) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<FileStatus>() {
|
|
return new FileSystemLinkResolver<FileStatus>() {
|
|
@Override
|
|
@Override
|
|
@@ -1389,6 +1431,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
throw new UnsupportedOperationException("Symlinks not supported");
|
|
throw new UnsupportedOperationException("Symlinks not supported");
|
|
}
|
|
}
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
|
|
final Path absF = fixRelativePart(link);
|
|
final Path absF = fixRelativePart(link);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1412,6 +1455,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public FileStatus getFileLinkStatus(final Path f) throws IOException {
|
|
public FileStatus getFileLinkStatus(final Path f) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_FILE_LINK_STATUS);
|
|
final Path absF = fixRelativePart(f);
|
|
final Path absF = fixRelativePart(f);
|
|
FileStatus status = new FileSystemLinkResolver<FileStatus>() {
|
|
FileStatus status = new FileSystemLinkResolver<FileStatus>() {
|
|
@Override
|
|
@Override
|
|
@@ -1441,6 +1485,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public Path getLinkTarget(final Path f) throws IOException {
|
|
public Path getLinkTarget(final Path f) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_LINK_TARGET);
|
|
final Path absF = fixRelativePart(f);
|
|
final Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<Path>() {
|
|
return new FileSystemLinkResolver<Path>() {
|
|
@Override
|
|
@Override
|
|
@@ -1462,6 +1507,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
protected Path resolveLink(Path f) throws IOException {
|
|
protected Path resolveLink(Path f) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.RESOLVE_LINK);
|
|
String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
|
|
String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
|
|
if (target == null) {
|
|
if (target == null) {
|
|
throw new FileNotFoundException("File does not exist: " + f.toString());
|
|
throw new FileNotFoundException("File does not exist: " + f.toString());
|
|
@@ -1472,6 +1518,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public FileChecksum getFileChecksum(Path f) throws IOException {
|
|
public FileChecksum getFileChecksum(Path f) throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<FileChecksum>() {
|
|
return new FileSystemLinkResolver<FileChecksum>() {
|
|
@Override
|
|
@Override
|
|
@@ -1491,6 +1538,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public FileChecksum getFileChecksum(Path f, final long length)
|
|
public FileChecksum getFileChecksum(Path f, final long length)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementReadOps(1);
|
|
statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
|
|
Path absF = fixRelativePart(f);
|
|
Path absF = fixRelativePart(f);
|
|
return new FileSystemLinkResolver<FileChecksum>() {
|
|
return new FileSystemLinkResolver<FileChecksum>() {
|
|
@Override
|
|
@Override
|
|
@@ -1516,6 +1564,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public void setPermission(Path p, final FsPermission permission
|
|
public void setPermission(Path p, final FsPermission permission
|
|
) throws IOException {
|
|
) throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
|
|
Path absF = fixRelativePart(p);
|
|
Path absF = fixRelativePart(p);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1540,6 +1589,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
throw new IOException("username == null && groupname == null");
|
|
throw new IOException("username == null && groupname == null");
|
|
}
|
|
}
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.SET_OWNER);
|
|
Path absF = fixRelativePart(p);
|
|
Path absF = fixRelativePart(p);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1561,6 +1611,7 @@ public class DistributedFileSystem extends FileSystem {
|
|
public void setTimes(Path p, final long mtime, final long atime)
|
|
public void setTimes(Path p, final long mtime, final long atime)
|
|
throws IOException {
|
|
throws IOException {
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.SET_TIMES);
|
|
Path absF = fixRelativePart(p);
|
|
Path absF = fixRelativePart(p);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1638,6 +1689,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
|
|
|
|
/** @see HdfsAdmin#allowSnapshot(Path) */
|
|
/** @see HdfsAdmin#allowSnapshot(Path) */
|
|
public void allowSnapshot(final Path path) throws IOException {
|
|
public void allowSnapshot(final Path path) throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1664,6 +1717,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
|
|
|
|
/** @see HdfsAdmin#disallowSnapshot(Path) */
|
|
/** @see HdfsAdmin#disallowSnapshot(Path) */
|
|
public void disallowSnapshot(final Path path) throws IOException {
|
|
public void disallowSnapshot(final Path path) throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1691,6 +1746,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public Path createSnapshot(final Path path, final String snapshotName)
|
|
public Path createSnapshot(final Path path, final String snapshotName)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
return new FileSystemLinkResolver<Path>() {
|
|
return new FileSystemLinkResolver<Path>() {
|
|
@Override
|
|
@Override
|
|
@@ -1716,6 +1773,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void renameSnapshot(final Path path, final String snapshotOldName,
|
|
public void renameSnapshot(final Path path, final String snapshotOldName,
|
|
final String snapshotNewName) throws IOException {
|
|
final String snapshotNewName) throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1752,6 +1811,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
|
|
public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
|
|
Path absF = fixRelativePart(snapshotDir);
|
|
Path absF = fixRelativePart(snapshotDir);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -1999,6 +2060,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
|
|
public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -2021,6 +2084,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
|
|
public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -2042,6 +2107,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public void removeDefaultAcl(Path path) throws IOException {
|
|
public void removeDefaultAcl(Path path) throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
|
|
final Path absF = fixRelativePart(path);
|
|
final Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -2062,6 +2129,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public void removeAcl(Path path) throws IOException {
|
|
public void removeAcl(Path path) throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
|
|
final Path absF = fixRelativePart(path);
|
|
final Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -2083,6 +2152,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void setAcl(Path path, final List<AclEntry> aclSpec)
|
|
public void setAcl(Path path, final List<AclEntry> aclSpec)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.SET_ACL);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -2181,6 +2252,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void setXAttr(Path path, final String name, final byte[] value,
|
|
public void setXAttr(Path path, final String name, final byte[] value,
|
|
final EnumSet<XAttrSetFlag> flag) throws IOException {
|
|
final EnumSet<XAttrSetFlag> flag) throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.SET_XATTR);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
|
|
|
|
@@ -2200,6 +2273,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public byte[] getXAttr(Path path, final String name) throws IOException {
|
|
public byte[] getXAttr(Path path, final String name) throws IOException {
|
|
|
|
+ statistics.incrementReadOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.GET_XATTR);
|
|
final Path absF = fixRelativePart(path);
|
|
final Path absF = fixRelativePart(path);
|
|
return new FileSystemLinkResolver<byte[]>() {
|
|
return new FileSystemLinkResolver<byte[]>() {
|
|
@Override
|
|
@Override
|
|
@@ -2265,6 +2340,8 @@ public class DistributedFileSystem extends FileSystem {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void removeXAttr(Path path, final String name) throws IOException {
|
|
public void removeXAttr(Path path, final String name) throws IOException {
|
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
|
+ storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
|
|
Path absF = fixRelativePart(path);
|
|
Path absF = fixRelativePart(path);
|
|
new FileSystemLinkResolver<Void>() {
|
|
new FileSystemLinkResolver<Void>() {
|
|
@Override
|
|
@Override
|
|
@@ -2421,4 +2498,5 @@ public class DistributedFileSystem extends FileSystem {
|
|
Statistics getFsStatistics() {
|
|
Statistics getFsStatistics() {
|
|
return statistics;
|
|
return statistics;
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|