Browse Source

HADOOP-17065. Add Network Counters to ABFS (#2056)

Contributed by Mehakmeet Singh.
Mehakmeet Singh 4 năm trước cách đây
mục cha
commit
bbd3278d09
14 tập tin đã thay đổi với 430 bổ sung34 xóa
  1. 10 3
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
  2. 18 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
  3. 8 7
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
  4. 9 6
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
  5. 20 5
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
  6. 5 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java
  7. 11 3
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java
  8. 23 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
  9. 2 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  10. 253 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
  11. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java
  12. 67 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java
  13. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java
  14. 2 2
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java

+ 10 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java → hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java

@@ -41,7 +41,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
 /**
 /**
  * Instrumentation of Abfs counters.
  * Instrumentation of Abfs counters.
  */
  */
-public class AbfsInstrumentation implements AbfsCounters {
+public class AbfsCountersImpl implements AbfsCounters {
 
 
   /**
   /**
    * Single context for all the Abfs counters to separate them from other
    * Single context for all the Abfs counters to separate them from other
@@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters {
       DIRECTORIES_DELETED,
       DIRECTORIES_DELETED,
       FILES_CREATED,
       FILES_CREATED,
       FILES_DELETED,
       FILES_DELETED,
-      ERROR_IGNORED
+      ERROR_IGNORED,
+      CONNECTIONS_MADE,
+      SEND_REQUESTS,
+      GET_RESPONSES,
+      BYTES_SENT,
+      BYTES_RECEIVED,
+      READ_THROTTLES,
+      WRITE_THROTTLES
   };
   };
 
 
-  public AbfsInstrumentation(URI uri) {
+  public AbfsCountersImpl(URI uri) {
     UUID fileSystemInstanceId = UUID.randomUUID();
     UUID fileSystemInstanceId = UUID.randomUUID();
     registry.tag(REGISTRY_ID,
     registry.tag(REGISTRY_ID,
         "A unique identifier for the instance",
         "A unique identifier for the instance",

+ 18 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java

@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
 
 
 /**
 /**
  * Statistic which are collected in Abfs.
  * Statistic which are collected in Abfs.
- * Available as metrics in {@link AbfsInstrumentation}.
+ * Available as metrics in {@link AbfsCountersImpl}.
  */
  */
 public enum AbfsStatistic {
 public enum AbfsStatistic {
 
 
@@ -57,7 +57,23 @@ public enum AbfsStatistic {
   FILES_DELETED("files_deleted",
   FILES_DELETED("files_deleted",
       "Total number of files deleted from the object store."),
       "Total number of files deleted from the object store."),
   ERROR_IGNORED("error_ignored",
   ERROR_IGNORED("error_ignored",
-      "Errors caught and ignored.");
+      "Errors caught and ignored."),
+
+  //Network statistics.
+  CONNECTIONS_MADE("connections_made",
+      "Total number of times a connection was made with the data store."),
+  SEND_REQUESTS("send_requests",
+      "Total number of times http requests were sent to the data store."),
+  GET_RESPONSES("get_responses",
+      "Total number of times a response was received."),
+  BYTES_SENT("bytes_sent",
+      "Total bytes uploaded."),
+  BYTES_RECEIVED("bytes_received",
+      "Total bytes received."),
+  READ_THROTTLES("read_throttles",
+      "Total number of times a read operation is throttled."),
+  WRITE_THROTTLES("write_throttles",
+      "Total number of times a write operation is throttled.");
 
 
   private String statName;
   private String statName;
   private String statDescription;
   private String statDescription;

+ 8 - 7
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {
 
 
   private boolean delegationTokenEnabled = false;
   private boolean delegationTokenEnabled = false;
   private AbfsDelegationTokenManager delegationTokenManager;
   private AbfsDelegationTokenManager delegationTokenManager;
-  private AbfsCounters instrumentation;
+  private AbfsCounters abfsCounters;
 
 
   @Override
   @Override
   public void initialize(URI uri, Configuration configuration)
   public void initialize(URI uri, Configuration configuration)
@@ -109,11 +109,12 @@ public class AzureBlobFileSystem extends FileSystem {
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
     LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
 
 
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
+    abfsCounters = new AbfsCountersImpl(uri);
+    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
+        configuration, abfsCounters);
     LOG.trace("AzureBlobFileSystemStore init complete");
     LOG.trace("AzureBlobFileSystemStore init complete");
 
 
     final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
     final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
-    instrumentation = new AbfsInstrumentation(uri);
     this.setWorkingDirectory(this.getHomeDirectory());
     this.setWorkingDirectory(this.getHomeDirectory());
 
 
     if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
     if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
@@ -150,8 +151,8 @@ public class AzureBlobFileSystem extends FileSystem {
     sb.append("uri=").append(uri);
     sb.append("uri=").append(uri);
     sb.append(", user='").append(abfsStore.getUser()).append('\'');
     sb.append(", user='").append(abfsStore.getUser()).append('\'');
     sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
     sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
-    if (instrumentation != null) {
-      sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
+    if (abfsCounters != null) {
+      sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
           "}", true));
           "}", true));
       sb.append("}");
       sb.append("}");
     }
     }
@@ -392,7 +393,7 @@ public class AzureBlobFileSystem extends FileSystem {
    * @param statistic the Statistic to be incremented.
    * @param statistic the Statistic to be incremented.
    */
    */
   private void incrementStatistic(AbfsStatistic statistic) {
   private void incrementStatistic(AbfsStatistic statistic) {
-    instrumentation.incrementCounter(statistic, 1);
+    abfsCounters.incrementCounter(statistic, 1);
   }
   }
 
 
   /**
   /**
@@ -1241,7 +1242,7 @@ public class AzureBlobFileSystem extends FileSystem {
 
 
   @VisibleForTesting
   @VisibleForTesting
   Map<String, Long> getInstrumentationMap() {
   Map<String, Long> getInstrumentationMap() {
-    return instrumentation.toMap();
+    return abfsCounters.toMap();
   }
   }
 
 
   @Override
   @Override

+ 9 - 6
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -83,6 +83,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
 import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
 import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
@@ -144,8 +145,9 @@ public class AzureBlobFileSystemStore implements Closeable {
   private final IdentityTransformerInterface identityTransformer;
   private final IdentityTransformerInterface identityTransformer;
   private final AbfsPerfTracker abfsPerfTracker;
   private final AbfsPerfTracker abfsPerfTracker;
 
 
-  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
-          throws IOException {
+  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
+                                  Configuration configuration,
+                                  AbfsCounters abfsCounters) throws IOException {
     this.uri = uri;
     this.uri = uri;
     String[] authorityParts = authorityParts(uri);
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
     final String fileSystemName = authorityParts[0];
@@ -183,7 +185,7 @@ public class AzureBlobFileSystemStore implements Closeable {
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
     this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
     this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
-    initializeClient(uri, fileSystemName, accountName, useHttps);
+    initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
     final Class<? extends IdentityTransformerInterface> identityTransformerClass =
     final Class<? extends IdentityTransformerInterface> identityTransformerClass =
         configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
         configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
             IdentityTransformerInterface.class);
             IdentityTransformerInterface.class);
@@ -1171,7 +1173,8 @@ public class AzureBlobFileSystemStore implements Closeable {
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
     return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
   }
   }
 
 
-  private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure)
+  private void initializeClient(URI uri, String fileSystemName,
+      String accountName, boolean isSecure, AbfsCounters abfsCounters)
       throws IOException {
       throws IOException {
     if (this.client != null) {
     if (this.client != null) {
       return;
       return;
@@ -1219,11 +1222,11 @@ public class AzureBlobFileSystemStore implements Closeable {
     if (tokenProvider != null) {
     if (tokenProvider != null) {
       this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
       this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
           new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
           new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
-          tokenProvider, abfsPerfTracker);
+          tokenProvider, abfsPerfTracker, abfsCounters);
     } else {
     } else {
       this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
       this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
           new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
           new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
-          sasTokenProvider, abfsPerfTracker);
+          sasTokenProvider, abfsPerfTracker, abfsCounters);
     }
     }
     LOG.trace("AbfsClient init complete");
     LOG.trace("AbfsClient init complete");
   }
   }

+ 20 - 5
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -73,11 +73,13 @@ public class AbfsClient implements Closeable {
   private final AuthType authType;
   private final AuthType authType;
   private AccessTokenProvider tokenProvider;
   private AccessTokenProvider tokenProvider;
   private SASTokenProvider sasTokenProvider;
   private SASTokenProvider sasTokenProvider;
+  private final AbfsCounters abfsCounters;
 
 
   private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
   private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
-                    final AbfsPerfTracker abfsPerfTracker) {
+                    final AbfsPerfTracker abfsPerfTracker,
+                    final AbfsCounters abfsCounters) {
     this.baseUrl = baseUrl;
     this.baseUrl = baseUrl;
     this.sharedKeyCredentials = sharedKeyCredentials;
     this.sharedKeyCredentials = sharedKeyCredentials;
     String baseUrlString = baseUrl.toString();
     String baseUrlString = baseUrl.toString();
@@ -104,14 +106,17 @@ public class AbfsClient implements Closeable {
 
 
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
     this.abfsPerfTracker = abfsPerfTracker;
     this.abfsPerfTracker = abfsPerfTracker;
+    this.abfsCounters = abfsCounters;
   }
   }
 
 
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
                     final AccessTokenProvider tokenProvider,
                     final AccessTokenProvider tokenProvider,
-                    final AbfsPerfTracker abfsPerfTracker) {
-    this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
+                    final AbfsPerfTracker abfsPerfTracker,
+                    final AbfsCounters abfsCounters) {
+    this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+        exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
     this.tokenProvider = tokenProvider;
     this.tokenProvider = tokenProvider;
   }
   }
 
 
@@ -119,8 +124,10 @@ public class AbfsClient implements Closeable {
                     final AbfsConfiguration abfsConfiguration,
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
                     final SASTokenProvider sasTokenProvider,
                     final SASTokenProvider sasTokenProvider,
-                    final AbfsPerfTracker abfsPerfTracker) {
-    this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
+                    final AbfsPerfTracker abfsPerfTracker,
+                    final AbfsCounters abfsCounters) {
+    this(baseUrl, sharedKeyCredentials, abfsConfiguration,
+        exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
     this.sasTokenProvider = sasTokenProvider;
     this.sasTokenProvider = sasTokenProvider;
   }
   }
 
 
@@ -892,4 +899,12 @@ public class AbfsClient implements Closeable {
   public SASTokenProvider getSasTokenProvider() {
   public SASTokenProvider getSasTokenProvider() {
     return this.sasTokenProvider;
     return this.sasTokenProvider;
   }
   }
+
+  /**
+   * Getter for abfsCounters from AbfsClient.
+   * @return AbfsCounters instance.
+   */
+  protected AbfsCounters getAbfsCounters() {
+    return abfsCounters;
+  }
 }
 }

+ 5 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java

@@ -114,16 +114,19 @@ class AbfsClientThrottlingAnalyzer {
 
 
   /**
   /**
    * Suspends the current storage operation, as necessary, to reduce throughput.
    * Suspends the current storage operation, as necessary, to reduce throughput.
+   * @return true if Thread sleeps(Throttling occurs) else false.
    */
    */
-  public void suspendIfNecessary() {
+  public boolean suspendIfNecessary() {
     int duration = sleepDuration;
     int duration = sleepDuration;
     if (duration > 0) {
     if (duration > 0) {
       try {
       try {
         Thread.sleep(duration);
         Thread.sleep(duration);
+        return true;
       } catch (InterruptedException ie) {
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt();
         Thread.currentThread().interrupt();
       }
       }
     }
     }
+    return false;
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
@@ -269,4 +272,4 @@ class AbfsClientThrottlingAnalyzer {
       this.operationsSuccessful = new AtomicLong();
       this.operationsSuccessful = new AtomicLong();
     }
     }
   }
   }
-}
+}

+ 11 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java

@@ -23,6 +23,7 @@ import java.net.HttpURLConnection;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 
 
 /**
 /**
@@ -103,17 +104,24 @@ public final class AbfsClientThrottlingIntercept {
    * uses this to suspend the request, if necessary, to minimize errors and
    * uses this to suspend the request, if necessary, to minimize errors and
    * maximize throughput.
    * maximize throughput.
    */
    */
-  static void sendingRequest(AbfsRestOperationType operationType) {
+  static void sendingRequest(AbfsRestOperationType operationType,
+      AbfsCounters abfsCounters) {
     if (!isAutoThrottlingEnabled) {
     if (!isAutoThrottlingEnabled) {
       return;
       return;
     }
     }
 
 
     switch (operationType) {
     switch (operationType) {
       case ReadFile:
       case ReadFile:
-        singleton.readThrottler.suspendIfNecessary();
+        if (singleton.readThrottler.suspendIfNecessary()
+            && abfsCounters != null) {
+          abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
+        }
         break;
         break;
       case Append:
       case Append:
-        singleton.writeThrottler.suspendIfNecessary();
+        if (singleton.writeThrottler.suspendIfNecessary()
+            && abfsCounters != null) {
+          abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
+        }
         break;
         break;
       default:
       default:
         break;
         break;

+ 23 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java

@@ -27,6 +27,7 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -66,6 +67,7 @@ public class AbfsRestOperation {
   private int retryCount = 0;
   private int retryCount = 0;
 
 
   private AbfsHttpOperation result;
   private AbfsHttpOperation result;
+  private AbfsCounters abfsCounters;
 
 
   public AbfsHttpOperation getResult() {
   public AbfsHttpOperation getResult() {
     return result;
     return result;
@@ -131,6 +133,7 @@ public class AbfsRestOperation {
     this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
     this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
             || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
             || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
     this.sasToken = sasToken;
     this.sasToken = sasToken;
+    this.abfsCounters = client.getAbfsCounters();
   }
   }
 
 
   /**
   /**
@@ -160,6 +163,7 @@ public class AbfsRestOperation {
     this.buffer = buffer;
     this.buffer = buffer;
     this.bufferOffset = bufferOffset;
     this.bufferOffset = bufferOffset;
     this.bufferLength = bufferLength;
     this.bufferLength = bufferLength;
+    this.abfsCounters = client.getAbfsCounters();
   }
   }
 
 
   /**
   /**
@@ -205,6 +209,7 @@ public class AbfsRestOperation {
     try {
     try {
       // initialize the HTTP request and open the connection
       // initialize the HTTP request and open the connection
       httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
       httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
+      incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
 
 
       switch(client.getAuthType()) {
       switch(client.getAuthType()) {
         case Custom:
         case Custom:
@@ -229,14 +234,19 @@ public class AbfsRestOperation {
       // dump the headers
       // dump the headers
       AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
       AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
           httpOperation.getConnection().getRequestProperties());
           httpOperation.getConnection().getRequestProperties());
-      AbfsClientThrottlingIntercept.sendingRequest(operationType);
+      AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
 
 
       if (hasRequestBody) {
       if (hasRequestBody) {
         // HttpUrlConnection requires
         // HttpUrlConnection requires
         httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
         httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
+        incrementCounter(AbfsStatistic.SEND_REQUESTS, 1);
+        incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength);
       }
       }
 
 
       httpOperation.processResponse(buffer, bufferOffset, bufferLength);
       httpOperation.processResponse(buffer, bufferOffset, bufferLength);
+      incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
+      incrementCounter(AbfsStatistic.BYTES_RECEIVED,
+          httpOperation.getBytesReceived());
     } catch (IOException ex) {
     } catch (IOException ex) {
       if (ex instanceof UnknownHostException) {
       if (ex instanceof UnknownHostException) {
         LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost()));
         LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost()));
@@ -276,4 +286,16 @@ public class AbfsRestOperation {
 
 
     return true;
     return true;
   }
   }
+
+  /**
+   * Incrementing Abfs counters with a long value.
+   *
+   * @param statistic the Abfs statistic that needs to be incremented.
+   * @param value     the value to be incremented by.
+   */
+  private void incrementCounter(AbfsStatistic statistic, long value) {
+    if (abfsCounters != null) {
+      abfsCounters.incrementCounter(statistic, value);
+    }
+  }
 }
 }

+ 2 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -436,9 +436,10 @@ public abstract class AbstractAbfsIntegrationTest extends
    * @param metricMap map of (String, Long) with statistics name as key and
    * @param metricMap map of (String, Long) with statistics name as key and
    *                  statistics value as map value.
    *                  statistics value as map value.
    */
    */
-  protected void assertAbfsStatistics(AbfsStatistic statistic,
+  protected long assertAbfsStatistics(AbfsStatistic statistic,
       long expectedValue, Map<String, Long> metricMap) {
       long expectedValue, Map<String, Long> metricMap) {
     assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
     assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
         (long) metricMap.get(statistic.getStatName()));
         (long) metricMap.get(statistic.getStatName()));
+    return expectedValue;
   }
   }
 }
 }

+ 253 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java

@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+
+public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
+  private static final int LARGE_OPERATIONS = 10;
+
+  public ITestAbfsNetworkStatistics() throws Exception {
+  }
+
+  /**
+   * Testing connections_made, send_request and bytes_send statistics in
+   * {@link AbfsRestOperation}.
+   */
+  @Test
+  public void testAbfsHttpSendStatistics() throws IOException {
+    describe("Test to check correct values of statistics after Abfs http send "
+        + "request is done.");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    Map<String, Long> metricMap;
+    Path sendRequestPath = path(getMethodName());
+    String testNetworkStatsString = "http_send";
+    long connectionsMade, requestsSent, bytesSent;
+
+    /*
+     * Creating AbfsOutputStream will result in 1 connection made and 1 send
+     * request.
+     */
+    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+        sendRequestPath)) {
+      out.write(testNetworkStatsString.getBytes());
+
+      /*
+       * Flushes all outstanding data (i.e. the current unfinished packet)
+       * from the client into the service on all DataNode replicas.
+       */
+      out.hflush();
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing the network stats with 1 write operation.
+       *
+       * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+       *
+       * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
+       *
+       * bytes_sent : bytes wrote in AbfsOutputStream.
+       */
+      connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+          6, metricMap);
+      requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
+          metricMap);
+      bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+          testNetworkStatsString.getBytes().length, metricMap);
+
+    }
+
+    // To close the AbfsOutputStream 1 connection is made and 1 request is sent.
+    connectionsMade++;
+    requestsSent++;
+
+    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+        sendRequestPath)) {
+
+      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+        out.write(testNetworkStatsString.getBytes());
+
+        /*
+         * 1 flush call would create 2 connections and 2 send requests.
+         * when hflush() is called it will essentially trigger append() and
+         * flush() inside AbfsRestOperation. Both of which calls
+         * executeHttpOperation() method which creates a connection and sends
+         * requests.
+         */
+        out.hflush();
+      }
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing the network stats with Large amount of bytes sent.
+       *
+       * connections made : connections_made(Last assertion) + 1
+       * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
+       *
+       * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
+       * LARGE_OPERATIONS * 2(flush).
+       *
+       * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
+       * wrote each time).
+       *
+       */
+      assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
+          connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
+      assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
+          requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
+      assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+          bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
+          metricMap);
+
+    }
+
+  }
+
+  /**
+   * Testing get_response and bytes_received in {@link AbfsRestOperation}.
+   */
+  @Test
+  public void testAbfsHttpResponseStatistics() throws IOException {
+    describe("Test to check correct values of statistics after Http "
+        + "Response is processed.");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    Path getResponsePath = path(getMethodName());
+    Map<String, Long> metricMap;
+    String testResponseString = "some response";
+    long getResponses, bytesReceived;
+
+    FSDataOutputStream out = null;
+    FSDataInputStream in = null;
+    try {
+
+      /*
+       * Creating a File and writing some bytes in it.
+       *
+       * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
+       * (Writing data in Data store).
+       *
+       */
+      out = fs.create(getResponsePath);
+      out.write(testResponseString.getBytes());
+      out.hflush();
+
+      // open would require 1 get response.
+      in = fs.open(getResponsePath);
+      // read would require 1 get response and also get the bytes received.
+      int result = in.read();
+
+      // Confirming read isn't -1.
+      LOG.info("Result of read operation : {}", result);
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing values of statistics after writing and reading a buffer.
+       *
+       * get_responses - 6(above operations) + 1(open()) + 1 (read()).
+       *
+       * bytes_received - This should be equal to bytes sent earlier.
+       */
+      getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
+          metricMap);
+      // Testing that bytes received is equal to bytes sent.
+      long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
+      bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
+          bytesSend,
+          metricMap);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+
+    // To close the streams 1 response is received.
+    getResponses++;
+
+    try {
+
+      /*
+       * Creating a file and writing buffer into it. Also recording the
+       * buffer for future read() call.
+       * This creating outputStream and writing requires 2 *
+       * (LARGE_OPERATIONS) get requests.
+       */
+      StringBuilder largeBuffer = new StringBuilder();
+      out = fs.create(getResponsePath);
+      for (int i = 0; i < LARGE_OPERATIONS; i++) {
+        out.write(testResponseString.getBytes());
+        out.hflush();
+        largeBuffer.append(testResponseString);
+      }
+
+      // Open requires 1 get_response.
+      in = fs.open(getResponsePath);
+
+      /*
+       * Reading the file which was written above. This read() call would
+       * read bytes equal to the bytes that was written above.
+       * Get response would be 1 only.
+       */
+      in.read(0, largeBuffer.toString().getBytes(), 0,
+          largeBuffer.toString().getBytes().length);
+
+      metricMap = fs.getInstrumentationMap();
+
+      /*
+       * Testing the statistics values after writing and reading a large buffer.
+       *
+       * get_response : get_responses(Last assertion) + 1
+       * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
+       * LARGE_OPERATIONS times) + 1(open()) + 1(read()).
+       *
+       * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
+       * bytes wrote each time (bytes_received is equal to bytes wrote in the
+       * File).
+       *
+       */
+      assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
+          bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
+          metricMap);
+      assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
+          getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+  }
+
+}

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java

@@ -45,7 +45,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
     describe("Testing the initial values of Abfs counters");
     describe("Testing the initial values of Abfs counters");
 
 
     AbfsCounters abfsCounters =
     AbfsCounters abfsCounters =
-        new AbfsInstrumentation(getFileSystem().getUri());
+        new AbfsCountersImpl(getFileSystem().getUri());
     Map<String, Long> metricMap = abfsCounters.toMap();
     Map<String, Long> metricMap = abfsCounters.toMap();
 
 
     for (Map.Entry<String, Long> entry : metricMap.entrySet()) {
     for (Map.Entry<String, Long> entry : metricMap.entrySet()) {

+ 67 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+
+public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
+
+  private static final int LARGE_OPERATIONS = 1000;
+
+  public TestAbfsNetworkStatistics() throws Exception {
+  }
+
+  /**
+   * Test to check correct values of read and write throttling statistics in
+   * {@code AbfsClientThrottlingAnalyzer}.
+   */
+  @Test
+  public void testAbfsThrottlingStatistics() throws IOException {
+    describe("Test to check correct values of read throttle and write "
+        + "throttle statistics in Abfs");
+
+    AbfsCounters statistics =
+        new AbfsCountersImpl(getFileSystem().getUri());
+
+    /*
+     * Calling the throttle methods to check correct summation and values of
+     * the counters.
+     */
+    for (int i = 0; i < LARGE_OPERATIONS; i++) {
+      statistics.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
+      statistics.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
+    }
+
+    Map<String, Long> metricMap = statistics.toMap();
+
+    /*
+     * Test to check read and write throttle statistics gave correct values for
+     * 1000 calls.
+     */
+    assertAbfsStatistics(AbfsStatistic.READ_THROTTLES, LARGE_OPERATIONS,
+        metricMap);
+    assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS,
+        metricMap);
+  }
+}

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java

@@ -43,7 +43,7 @@ public class TestAbfsStatistics extends AbstractAbfsIntegrationTest {
     describe("Testing the counter values after Abfs is initialised");
     describe("Testing the counter values after Abfs is initialised");
 
 
     AbfsCounters instrumentation =
     AbfsCounters instrumentation =
-        new AbfsInstrumentation(getFileSystem().getUri());
+        new AbfsCountersImpl(getFileSystem().getUri());
 
 
     //Testing summation of the counter values.
     //Testing summation of the counter values.
     for (int i = 0; i < LARGE_OPS; i++) {
     for (int i = 0; i < LARGE_OPS; i++) {

+ 2 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java

@@ -100,7 +100,7 @@ public final class TestAbfsClient {
   private String getUserAgentString(AbfsConfiguration config,
   private String getUserAgentString(AbfsConfiguration config,
       boolean includeSSLProvider) throws MalformedURLException {
       boolean includeSSLProvider) throws MalformedURLException {
     AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
     AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
-        config, null, (AccessTokenProvider) null, null);
+        config, null, (AccessTokenProvider) null, null, null);
     String sslProviderName = null;
     String sslProviderName = null;
     if (includeSSLProvider) {
     if (includeSSLProvider) {
       sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
       sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
@@ -267,7 +267,7 @@ public final class TestAbfsClient {
         (currentAuthType == AuthType.OAuth
         (currentAuthType == AuthType.OAuth
             ? abfsConfig.getTokenProvider()
             ? abfsConfig.getTokenProvider()
             : null),
             : null),
-        tracker);
+        tracker, null);
 
 
     return testClient;
     return testClient;
   }
   }