Browse Source

HADOOP-19311: [ABFS] Implement Backoff and Read Footer metrics using IOStatistics Class (#7122)

Contributed by: Manish Bhatt <bhattmanish98>

Reviewed by: Saikat roy <saikatroy038> and Anuj Modi <anujmodi@apache.org>
Signed-off by: Anuj Modi
Manish Bhatt 3 months ago
parent
commit
57d0979d38
19 changed files with 1751 additions and 883 deletions
  1. 0 312
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java
  2. 5 7
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
  3. 121 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
  4. 110 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsBackoffMetricsEnum.java
  5. 97 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadFooterMetricsEnum.java
  6. 36 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/FileType.java
  7. 83 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/RetryValue.java
  8. 37 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/StatisticTypeEnum.java
  9. 323 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBackoffMetrics.java
  10. 0 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
  11. 1 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
  12. 486 498
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java
  13. 34 48
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
  14. 153 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsStatisticsSource.java
  15. 25 14
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java
  16. 115 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsBackoffMetrics.java
  17. 98 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsReadFooterMetrics.java
  18. 3 2
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java
  19. 24 0
      hadoop-tools/hadoop-azure/src/test/resources/accountSettings/accountName_settings.xml.template

+ 0 - 312
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java

@@ -1,312 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND;
-
-public class AbfsBackoffMetrics {
-
-  private AtomicLong numberOfRequestsSucceeded;
-
-  private AtomicLong minBackoff;
-
-  private AtomicLong maxBackoff;
-
-  private AtomicLong totalRequests;
-
-  private AtomicLong totalBackoff;
-
-  private String retryCount;
-
-  private AtomicLong numberOfIOPSThrottledRequests;
-
-  private AtomicLong numberOfBandwidthThrottledRequests;
-
-  private AtomicLong numberOfOtherThrottledRequests;
-
-  private AtomicLong numberOfNetworkFailedRequests;
-
-  private AtomicLong maxRetryCount;
-
-  private AtomicLong totalNumberOfRequests;
-
-  private AtomicLong numberOfRequestsSucceededWithoutRetrying;
-
-  private AtomicLong numberOfRequestsFailed;
-
-  private final Map<String, AbfsBackoffMetrics> metricsMap
-      = new ConcurrentHashMap<>();
-
-  public AbfsBackoffMetrics() {
-    initializeMap();
-    this.numberOfIOPSThrottledRequests = new AtomicLong();
-    this.numberOfBandwidthThrottledRequests = new AtomicLong();
-    this.numberOfOtherThrottledRequests = new AtomicLong();
-    this.totalNumberOfRequests = new AtomicLong();
-    this.maxRetryCount = new AtomicLong();
-    this.numberOfRequestsSucceededWithoutRetrying = new AtomicLong();
-    this.numberOfRequestsFailed = new AtomicLong();
-    this.numberOfNetworkFailedRequests = new AtomicLong();
-  }
-
-  public AbfsBackoffMetrics(String retryCount) {
-    this.retryCount = retryCount;
-    this.numberOfRequestsSucceeded = new AtomicLong();
-    this.minBackoff = new AtomicLong(Long.MAX_VALUE);
-    this.maxBackoff = new AtomicLong();
-    this.totalRequests = new AtomicLong();
-    this.totalBackoff = new AtomicLong();
-  }
-
-  private void initializeMap() {
-    ArrayList<String> retryCountList = new ArrayList<String>(
-        Arrays.asList("1", "2", "3", "4", "5_15", "15_25", "25AndAbove"));
-    for (String s : retryCountList) {
-      metricsMap.put(s, new AbfsBackoffMetrics(s));
-    }
-  }
-
-  public long getNumberOfRequestsSucceeded() {
-    return this.numberOfRequestsSucceeded.get();
-  }
-
-  public void setNumberOfRequestsSucceeded(long numberOfRequestsSucceeded) {
-    this.numberOfRequestsSucceeded.set(numberOfRequestsSucceeded);
-  }
-
-  public void incrementNumberOfRequestsSucceeded() {
-    this.numberOfRequestsSucceeded.getAndIncrement();
-  }
-
-  public long getMinBackoff() {
-    return this.minBackoff.get();
-  }
-
-  public void setMinBackoff(long minBackoff) {
-    this.minBackoff.set(minBackoff);
-  }
-
-  public long getMaxBackoff() {
-    return this.maxBackoff.get();
-  }
-
-  public void setMaxBackoff(long maxBackoff) {
-    this.maxBackoff.set(maxBackoff);
-  }
-
-  public long getTotalRequests() {
-    return this.totalRequests.get();
-  }
-
-  public void incrementTotalRequests() {
-    this.totalRequests.incrementAndGet();
-  }
-
-  public void setTotalRequests(long totalRequests) {
-    this.totalRequests.set(totalRequests);
-  }
-
-  public long getTotalBackoff() {
-    return this.totalBackoff.get();
-  }
-
-  public void setTotalBackoff(long totalBackoff) {
-    this.totalBackoff.set(totalBackoff);
-  }
-
-  public String getRetryCount() {
-    return this.retryCount;
-  }
-
-  public long getNumberOfIOPSThrottledRequests() {
-    return this.numberOfIOPSThrottledRequests.get();
-  }
-
-  public void setNumberOfIOPSThrottledRequests(long numberOfIOPSThrottledRequests) {
-    this.numberOfIOPSThrottledRequests.set(numberOfIOPSThrottledRequests);
-  }
-
-  public void incrementNumberOfIOPSThrottledRequests() {
-    this.numberOfIOPSThrottledRequests.getAndIncrement();
-  }
-
-  public long getNumberOfBandwidthThrottledRequests() {
-    return this.numberOfBandwidthThrottledRequests.get();
-  }
-
-  public void setNumberOfBandwidthThrottledRequests(long numberOfBandwidthThrottledRequests) {
-    this.numberOfBandwidthThrottledRequests.set(numberOfBandwidthThrottledRequests);
-  }
-
-  public void incrementNumberOfBandwidthThrottledRequests() {
-    this.numberOfBandwidthThrottledRequests.getAndIncrement();
-  }
-
-  public long getNumberOfOtherThrottledRequests() {
-    return this.numberOfOtherThrottledRequests.get();
-  }
-
-  public void setNumberOfOtherThrottledRequests(long numberOfOtherThrottledRequests) {
-    this.numberOfOtherThrottledRequests.set(numberOfOtherThrottledRequests);
-  }
-
-  public void incrementNumberOfOtherThrottledRequests() {
-    this.numberOfOtherThrottledRequests.getAndIncrement();
-  }
-
-  public long getMaxRetryCount() {
-    return this.maxRetryCount.get();
-  }
-
-  public void setMaxRetryCount(long maxRetryCount) {
-    this.maxRetryCount.set(maxRetryCount);
-  }
-
-  public void incrementMaxRetryCount() {
-    this.maxRetryCount.getAndIncrement();
-  }
-
-  public long getTotalNumberOfRequests() {
-    return this.totalNumberOfRequests.get();
-  }
-
-  public void setTotalNumberOfRequests(long totalNumberOfRequests) {
-    this.totalNumberOfRequests.set(totalNumberOfRequests);
-  }
-
-  public void incrementTotalNumberOfRequests() {
-    this.totalNumberOfRequests.getAndIncrement();
-  }
-
-  public Map<String, AbfsBackoffMetrics> getMetricsMap() {
-    return metricsMap;
-  }
-
-  public long getNumberOfRequestsSucceededWithoutRetrying() {
-    return this.numberOfRequestsSucceededWithoutRetrying.get();
-  }
-
-  public void setNumberOfRequestsSucceededWithoutRetrying(long numberOfRequestsSucceededWithoutRetrying) {
-    this.numberOfRequestsSucceededWithoutRetrying.set(numberOfRequestsSucceededWithoutRetrying);
-  }
-
-  public void incrementNumberOfRequestsSucceededWithoutRetrying() {
-    this.numberOfRequestsSucceededWithoutRetrying.getAndIncrement();
-  }
-
-  public long getNumberOfRequestsFailed() {
-    return this.numberOfRequestsFailed.get();
-  }
-
-  public void setNumberOfRequestsFailed(long numberOfRequestsFailed) {
-    this.numberOfRequestsFailed.set(numberOfRequestsFailed);
-  }
-
-  public void incrementNumberOfRequestsFailed() {
-    this.numberOfRequestsFailed.getAndIncrement();
-  }
-
-  public long getNumberOfNetworkFailedRequests() {
-    return this.numberOfNetworkFailedRequests.get();
-  }
-
-  public void setNumberOfNetworkFailedRequests(long numberOfNetworkFailedRequests) {
-    this.numberOfNetworkFailedRequests.set(numberOfNetworkFailedRequests);
-  }
-
-  public void incrementNumberOfNetworkFailedRequests() {
-    this.numberOfNetworkFailedRequests.getAndIncrement();
-  }
-
-  /*
-          Acronyms :-
-          1.RCTSI :- Request count that succeeded in x retries
-          2.MMA :- Min Max Average (This refers to the backoff or sleep time between 2 requests)
-          3.s :- seconds
-          4.BWT :- Number of Bandwidth throttled requests
-          5.IT :- Number of IOPS throttled requests
-          6.OT :- Number of Other throttled requests
-          7.NFR :- Number of requests which failed due to network errors
-          8.%RT :- Percentage of requests that are throttled
-          9.TRNR :- Total number of requests which succeeded without retrying
-          10.TRF :- Total number of requests which failed
-          11.TR :- Total number of requests which were made
-          12.MRC :- Max retry count across all requests
-           */
-  @Override
-  public String toString() {
-    StringBuilder metricString = new StringBuilder();
-    long totalRequestsThrottled = getNumberOfBandwidthThrottledRequests()
-        + getNumberOfIOPSThrottledRequests()
-        + getNumberOfOtherThrottledRequests();
-    double percentageOfRequestsThrottled =
-        ((double) totalRequestsThrottled / getTotalNumberOfRequests()) * HUNDRED;
-    for (Map.Entry<String, AbfsBackoffMetrics> entry : metricsMap.entrySet()) {
-      metricString.append("$RCTSI$_").append(entry.getKey())
-          .append("R_").append("=")
-          .append(entry.getValue().getNumberOfRequestsSucceeded());
-      long totalRequests = entry.getValue().getTotalRequests();
-      if (totalRequests > 0) {
-        metricString.append("$MMA$_").append(entry.getKey())
-            .append("R_").append("=")
-            .append(String.format("%.3f",
-                (double) entry.getValue().getMinBackoff() / THOUSAND))
-            .append("s")
-            .append(String.format("%.3f",
-                (double) entry.getValue().getMaxBackoff() / THOUSAND))
-            .append("s")
-            .append(String.format("%.3f",
-                ((double) entry.getValue().getTotalBackoff() / totalRequests)
-                    / THOUSAND))
-            .append("s");
-      } else {
-        metricString.append("$MMA$_").append(entry.getKey())
-            .append("R_").append("=0s");
-      }
-    }
-    metricString.append("$BWT=")
-        .append(getNumberOfBandwidthThrottledRequests())
-        .append("$IT=")
-        .append(getNumberOfIOPSThrottledRequests())
-        .append("$OT=")
-        .append(getNumberOfOtherThrottledRequests())
-        .append("$RT=")
-        .append(String.format("%.3f", percentageOfRequestsThrottled))
-        .append("$NFR=")
-        .append(getNumberOfNetworkFailedRequests())
-        .append("$TRNR=")
-        .append(getNumberOfRequestsSucceededWithoutRetrying())
-        .append("$TRF=")
-        .append(getNumberOfRequestsFailed())
-        .append("$TR=")
-        .append(getTotalNumberOfRequests())
-        .append("$MRC=")
-        .append(getMaxRetryCount());
-
-    return metricString + "";
-  }
-}
-

+ 5 - 7
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java

@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
 
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBackoffMetrics;
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
 import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
@@ -69,6 +70,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SERVER_UNAVAILABLE;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SERVER_UNAVAILABLE;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.WRITE_THROTTLES;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.WRITE_THROTTLES;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.now;
 
 
@@ -324,18 +326,14 @@ public class AbfsCountersImpl implements AbfsCounters {
   public String toString() {
   public String toString() {
     String metric = "";
     String metric = "";
     if (abfsBackoffMetrics != null) {
     if (abfsBackoffMetrics != null) {
-      long totalNoRequests = getAbfsBackoffMetrics().getTotalNumberOfRequests();
+      long totalNoRequests = getAbfsBackoffMetrics().getMetricValue(TOTAL_NUMBER_OF_REQUESTS);
       if (totalNoRequests > 0) {
       if (totalNoRequests > 0) {
         metric += "#BO:" + getAbfsBackoffMetrics().toString();
         metric += "#BO:" + getAbfsBackoffMetrics().toString();
       }
       }
     }
     }
     if (abfsReadFooterMetrics != null) {
     if (abfsReadFooterMetrics != null) {
-      Map<String, AbfsReadFooterMetrics> metricsMap = getAbfsReadFooterMetrics().getMetricsMap();
-      if (metricsMap != null && !(metricsMap.isEmpty())) {
-        String readFooterMetric = getAbfsReadFooterMetrics().toString();
-        if (!readFooterMetric.equals("")) {
-          metric += "#FO:" + getAbfsReadFooterMetrics().toString();
-        }
+      if (getAbfsReadFooterMetrics().getTotalFiles() > 0) {
+        metric += "#FO:" + getAbfsReadFooterMetrics().toString();
       }
       }
     }
     }
     return metric;
     return metric;

+ 121 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.constants;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Responsible to keep all constant keys related to ABFS metrics.
+ */
+@InterfaceAudience.Private
+public final class MetricsConstants {
+    /**
+     * Type of ABFS Backoff Metrics: Base or Retry
+     */
+    public static final String RETRY = "RETRY";
+    /**
+     * Type of ABFS Backoff Metrics: Base or Retry
+     */
+    public static final String BASE = "BASE";
+    /**
+     * Type of ABFS Readfooter Metrics: File
+     */
+    public static final String FILE = "FILE";
+    /**
+     * Precision Format for double data type
+     */
+    public static final String DOUBLE_PRECISION_FORMAT = "%.3f";
+    /**
+     * Request count that succeeded in x retries
+     */
+    public static final String REQUEST_COUNT = "$RCTSI$_";
+    /**
+     * Min Max Average (This refers to the backoff or sleep time between 2 requests)
+     */
+    public static final String MIN_MAX_AVERAGE = "$MMA$_";
+    /**
+     * Time unit: Seconds
+     */
+    public static final String SECONDS = "s";
+    /**
+     * Number of requests with x retries
+     */
+    public static final String REQUESTS = "R=";
+    /**
+     * Number of Bandwidth throttled requests
+     */
+    public static final String BANDWIDTH_THROTTLED_REQUESTS = "$BWT=";
+    /**
+     * Number of IOPS throttled requests
+     */
+    public static final String IOPS_THROTTLED_REQUESTS = "$IT=";
+    /**
+     * Number of Other throttled requests
+     */
+    public static final String OTHER_THROTTLED_REQUESTS = "$OT=";
+    /**
+     * Percentage of requests that are throttled
+     */
+    public static final String PERCENTAGE_THROTTLED_REQUESTS = "$RT=";
+    /**
+     * Number of requests which failed due to network errors
+     */
+    public static final String NETWORK_ERROR_REQUESTS = "$NFR=";
+    /**
+     * Total number of requests which succeeded without retrying
+     */
+    public static final String SUCCESS_REQUESTS_WITHOUT_RETRY = "$TRNR=";
+    /**
+     * Total number of requests which failed
+     */
+    public static final String FAILED_REQUESTS = "$TRF=";
+    /**
+     * Total number of requests which were made
+     */
+    public static final String TOTAL_REQUESTS_COUNT = "$TR=";
+    /**
+     * Max retry count across all requests
+     */
+    public static final String MAX_RETRY = "$MRC=";
+    /**
+     * Special character: Dollar
+     */
+    public static final String CHAR_DOLLAR = "$";
+    /**
+     * String to represent the average first read
+     */
+    public static final String FIRST_READ = ":$FR=";
+    /**
+     * String to represent the average second read
+     */
+    public static final String SECOND_READ = "$SR=";
+    /**
+     * String to represent the average file length
+     */
+    public static final String FILE_LENGTH = "$FL=";
+    /**
+     * String to represent the average read length
+     */
+    public static final String READ_LENGTH = "$RL=";
+
+    // Private constructor to prevent instantiation
+    private MetricsConstants() {
+        throw new AssertionError("Cannot instantiate MetricsConstants");
+    }
+}

+ 110 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsBackoffMetricsEnum.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.BASE;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.RETRY;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_GAUGE;
+
+/**
+ * Enum representing various ABFS backoff metrics
+ */
+public enum AbfsBackoffMetricsEnum {
+    NUMBER_OF_IOPS_THROTTLED_REQUESTS("numberOfIOPSThrottledRequests",
+            "Number of IOPS throttled requests", BASE, TYPE_COUNTER),
+    NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS("numberOfBandwidthThrottledRequests",
+            "Number of bandwidth throttled requests", BASE, TYPE_COUNTER),
+    NUMBER_OF_OTHER_THROTTLED_REQUESTS("numberOfOtherThrottledRequests",
+            "Number of other throttled requests", BASE, TYPE_COUNTER),
+    NUMBER_OF_NETWORK_FAILED_REQUESTS("numberOfNetworkFailedRequests",
+            "Number of network failed requests", BASE, TYPE_COUNTER),
+    MAX_RETRY_COUNT("maxRetryCount", "Max retry count", BASE, TYPE_COUNTER),
+    TOTAL_NUMBER_OF_REQUESTS("totalNumberOfRequests",
+            "Total number of requests", BASE, TYPE_COUNTER),
+    NUMBER_OF_REQUESTS_SUCCEEDED_WITHOUT_RETRYING("numberOfRequestsSucceededWithoutRetrying",
+            "Number of requests succeeded without retrying", BASE, TYPE_COUNTER),
+    NUMBER_OF_REQUESTS_FAILED("numberOfRequestsFailed",
+            "Number of requests failed", BASE, TYPE_COUNTER),
+    NUMBER_OF_REQUESTS_SUCCEEDED("numberOfRequestsSucceeded",
+            "Number of requests succeeded", RETRY, TYPE_COUNTER),
+    MIN_BACK_OFF("minBackOff", "Minimum backoff", RETRY, TYPE_GAUGE),
+    MAX_BACK_OFF("maxBackOff", "Maximum backoff", RETRY, TYPE_GAUGE),
+    TOTAL_BACK_OFF("totalBackoff", "Total backoff", RETRY, TYPE_GAUGE),
+    TOTAL_REQUESTS("totalRequests", "Total requests", RETRY, TYPE_COUNTER);
+
+    private final String name;
+    private final String description;
+    private final String type;
+    private final StatisticTypeEnum statisticType;
+
+    /**
+     * Constructor for AbfsBackoffMetricsEnum.
+     *
+     * @param name the name of the metric
+     * @param description the description of the metric
+     * @param type the type of the metric (BASE or RETRY)
+     * @param statisticType the statistic type of the metric (counter or gauge)
+     */
+    AbfsBackoffMetricsEnum(String name,
+                           String description,
+                           String type,
+                           StatisticTypeEnum statisticType) {
+        this.name = name;
+        this.description = description;
+        this.type = type;
+        this.statisticType = statisticType;
+    }
+
+    /**
+     * Gets the name of the metric.
+     *
+     * @return the name of the metric
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Gets the description of the metric.
+     *
+     * @return the description of the metric
+     */
+    public String getDescription() {
+        return description;
+    }
+
+    /**
+     * Gets the type of the metric.
+     *
+     * @return the type of the metric
+     */
+    public String getType() {
+        return type;
+    }
+
+    /**
+     * Gets the statistic type of the metric.
+     *
+     * @return the statistic type of the metric
+     */
+    public StatisticTypeEnum getStatisticType() {
+        return statisticType;
+    }
+}

+ 97 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadFooterMetricsEnum.java

@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
+
+/**
+ * Enum representing various ABFS read footer metrics.
+ */
+public enum AbfsReadFooterMetricsEnum {
+    TOTAL_FILES("totalFiles", "Total files read", FILE,  TYPE_COUNTER),
+    AVG_FILE_LENGTH("avgFileLength", "Average File length", FILE, TYPE_MEAN),
+    AVG_SIZE_READ_BY_FIRST_READ("avgSizeReadByFirstRead",
+            "Average Size read by first read", FILE, TYPE_MEAN),
+    AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ("avgOffsetDiffBetweenFirstAndSecondRead",
+            "Average Offset difference between first and second read", FILE, TYPE_MEAN),
+    AVG_READ_LEN_REQUESTED("avgReadLenRequested", "Average Read length requested", FILE, TYPE_MEAN),
+    AVG_FIRST_OFFSET_DIFF("avgFirstOffsetDiff", "Average First offset difference", FILE, TYPE_MEAN),
+    AVG_SECOND_OFFSET_DIFF("avgSecondOffsetDiff", "Average Second offset difference", FILE, TYPE_MEAN);
+
+    private final String name;
+    private final String description;
+    private final String type;
+    private final StatisticTypeEnum statisticType;
+
+    /**
+     * Constructor for AbfsReadFooterMetricsEnum.
+     *
+     * @param name the name of the metric
+     * @param description the description of the metric
+     * @param type the type of the metric (FILE)
+     * @param statisticType the statistic type of the metric (counter or gauge)
+     */
+    AbfsReadFooterMetricsEnum(String name,
+                              String description,
+                              String type,
+                              StatisticTypeEnum statisticType) {
+        this.name = name;
+        this.description = description;
+        this.type = type;
+        this.statisticType = statisticType;
+    }
+
+    /**
+     * Gets the name of the metric.
+     *
+     * @return the name of the metric
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Gets the description of the metric.
+     *
+     * @return the description of the metric
+     */
+    public String getDescription() {
+        return description;
+    }
+
+    /**
+     * Gets the type of the metric.
+     *
+     * @return the type of the metric
+     */
+    public String getType() {
+        return type;
+    }
+
+    /**
+     * Gets the statistic type of the metric.
+     *
+     * @return the statistic type of the metric
+     */
+    public StatisticTypeEnum getStatisticType() {
+        return statisticType;
+    }
+}

+ 36 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/FileType.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
+
+/**
+ * Enum for file types.
+ * Used in {@link AbfsReadFooterMetrics} to store metrics based on file type.
+ */
+public enum FileType {
+    /**
+     * Parquet file.
+     */
+    PARQUET,
+    /**
+     * Non-parquet file.
+     */
+    NON_PARQUET
+}

+ 83 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/RetryValue.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+import org.apache.hadoop.fs.azurebfs.services.AbfsBackoffMetrics;
+
+/**
+ * Enum for retry values.
+ * Used in {@link AbfsBackoffMetrics} to store metrics based on the retry count.
+ */
+public enum RetryValue {
+    ONE("1"),
+    TWO("2"),
+    THREE("3"),
+    FOUR("4"),
+    FIVE_FIFTEEN("5_15"),
+    FIFTEEN_TWENTY_FIVE("15_25"),
+    TWENTY_FIVE_AND_ABOVE("25AndAbove");
+
+    private static final int FIVE = 5;
+    private static final int FIFTEEN = 15;
+    private static final int TWENTY_FIVE = 25;
+
+    private final String value;
+
+    /**
+     * Constructor for RetryValue enum.
+     *
+     * @param value the string representation of the retry value
+     */
+    RetryValue(String value) {
+        this.value = value;
+    }
+
+    /**
+     * Gets the string representation of the retry value.
+     *
+     * @return the string representation of the retry value
+     */
+    public String getValue() {
+        return value;
+    }
+
+    /**
+     * Gets the RetryValue enum based on the retry count.
+     *
+     * @param retryCount the retry count
+     * @return the corresponding RetryValue enum
+     */
+    public static RetryValue getRetryValue(int retryCount) {
+        if (retryCount == 1) {
+            return ONE;
+        } else if (retryCount == 2) {
+            return TWO;
+        } else if (retryCount == 3) {
+            return THREE;
+        } else if (retryCount == 4) {
+            return FOUR;
+        } else if (retryCount >= FIVE && retryCount < FIFTEEN) {
+            return FIVE_FIFTEEN;
+        } else if (retryCount >= FIFTEEN && retryCount < TWENTY_FIVE) {
+            return FIFTEEN_TWENTY_FIVE;
+        } else {
+            return TWENTY_FIVE_AND_ABOVE;
+        }
+    }
+}

+ 37 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/StatisticTypeEnum.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Enum for statistic types.
+ */
+public enum StatisticTypeEnum {
+    /**
+     * Counter.
+     */
+    TYPE_COUNTER,
+    /**
+     * Gauge.
+     */
+    TYPE_GAUGE,
+    /**
+     * Mean.
+     */
+    TYPE_MEAN
+}

+ 323 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBackoffMetrics.java

@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.RetryValue;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EQUAL;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.RETRY;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.REQUEST_COUNT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECONDS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.MIN_MAX_AVERAGE;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.BANDWIDTH_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.IOPS_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.OTHER_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.PERCENTAGE_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NETWORK_ERROR_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SUCCESS_REQUESTS_WITHOUT_RETRY;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FAILED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.TOTAL_REQUESTS_COUNT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.MAX_RETRY;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MAX_BACK_OFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MIN_BACK_OFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MAX_RETRY_COUNT;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_IOPS_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_NETWORK_FAILED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_OTHER_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_FAILED;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_SUCCEEDED;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_SUCCEEDED_WITHOUT_RETRYING;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_BACK_OFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_GAUGE;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.StringUtils.format;
+import static org.apache.hadoop.util.StringUtils.formatPercent;
+
+/**
+ * This class is responsible for tracking and
+ * updating metrics related to backoff and
+ * retry operations in Azure Blob File System (ABFS).
+ */
+public class AbfsBackoffMetrics extends AbstractAbfsStatisticsSource {
+  private static final Logger LOG = LoggerFactory.getLogger(AbfsBackoffMetrics.class);
+  private static final List<RetryValue> RETRY_LIST = Arrays.asList(
+          RetryValue.values());
+
+  /**
+   * Constructor to initialize the IOStatisticsStore with counters and gauges.
+   */
+  public AbfsBackoffMetrics() {
+    IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+            .withCounters(getMetricNames(TYPE_COUNTER))
+            .withGauges(getMetricNames(TYPE_GAUGE))
+            .build();
+    setIOStatistics(ioStatisticsStore);
+  }
+
+  /**
+   * Retrieves the metric names based on the statistic type.
+   *
+   * @param type the type of the statistic (counter or gauge)
+   * @return an array of metric names
+   */
+  private String[] getMetricNames(StatisticTypeEnum type) {
+    return Arrays.stream(AbfsBackoffMetricsEnum.values())
+            .filter(backoffMetricsEnum -> backoffMetricsEnum
+                    .getStatisticType()
+                    .equals(type))
+            .flatMap(backoffMetricsEnum ->
+                    RETRY.equals(backoffMetricsEnum.getType())
+                            ? RETRY_LIST.stream().map(retryCount ->
+                            getMetricName(backoffMetricsEnum, retryCount))
+                            : Stream.of(backoffMetricsEnum.getName())
+            ).toArray(String[]::new);
+  }
+
+  /**
+   * Constructs the metric name based on the metric and retry value.
+   *
+   * @param metric the metric enum
+   * @param retryValue the retry value
+   * @return the constructed metric name
+   */
+  private String getMetricName(AbfsBackoffMetricsEnum metric, RetryValue retryValue) {
+    if (metric == null) {
+      LOG.error("ABFS Backoff Metric should not be null");
+      return EMPTY_STRING;
+    }
+    if (RETRY.equals(metric.getType()) && retryValue != null) {
+      return retryValue.getValue() + COLON + metric.getName();
+    }
+    return metric.getName();
+  }
+
+  /**
+   * Retrieves the value of a specific metric.
+   *
+   * @param metric the metric enum
+   * @param retryValue the retry value
+   * @return the value of the metric
+   */
+  public long getMetricValue(AbfsBackoffMetricsEnum metric, RetryValue retryValue) {
+    String metricName = getMetricName(metric, retryValue);
+    switch (metric.getStatisticType()) {
+      case TYPE_COUNTER:
+        return lookupCounterValue(metricName);
+      case TYPE_GAUGE:
+        return lookupGaugeValue(metricName);
+      default:
+        return 0;
+    }
+  }
+
+  /**
+   * Retrieves the value of a specific metric.
+   *
+   * @param metric the metric enum
+   * @return the value of the metric
+   */
+  public long getMetricValue(AbfsBackoffMetricsEnum metric) {
+    return getMetricValue(metric, null);
+  }
+
+  /**
+   * Increments the value of a specific metric.
+   *
+   * @param metric the metric enum
+   * @param retryValue the retry value
+   */
+  public void incrementMetricValue(AbfsBackoffMetricsEnum metric, RetryValue retryValue) {
+    String metricName = getMetricName(metric, retryValue);
+    switch (metric.getStatisticType()) {
+      case TYPE_COUNTER:
+        incCounterValue(metricName);
+        break;
+      case TYPE_GAUGE:
+        incGaugeValue(metricName);
+        break;
+      default:
+        // Do nothing
+        break;
+    }
+  }
+
+  /**
+   * Increments the value of a specific metric.
+   *
+   * @param metric the metric enum
+   */
+  public void incrementMetricValue(AbfsBackoffMetricsEnum metric) {
+    incrementMetricValue(metric, null);
+  }
+
+  /**
+   * Sets the value of a specific metric.
+   *
+   * @param metric the metric enum
+   * @param value the new value of the metric
+   * @param retryValue the retry value
+   */
+  public void setMetricValue(AbfsBackoffMetricsEnum metric, long value, RetryValue retryValue) {
+    String metricName = getMetricName(metric, retryValue);
+    switch (metric.getStatisticType()) {
+      case TYPE_COUNTER:
+        setCounterValue(metricName, value);
+        break;
+      case TYPE_GAUGE:
+        setGaugeValue(metricName, value);
+        break;
+      default:
+        // Do nothing
+        break;
+    }
+  }
+
+  /**
+   * Sets the value of a specific metric.
+   *
+   * @param metric the metric enum
+   * @param value the new value of the metric
+   */
+  public void setMetricValue(AbfsBackoffMetricsEnum metric, long value) {
+    setMetricValue(metric, value, null);
+  }
+
+  /**
+   * Get the precision metrics.
+   *
+   * @param metricName the metric name
+   * @param retryCount the retry count
+   * @param denominator the denominator
+   * @return String metrics value with precision
+   */
+  private String getPrecisionMetrics(AbfsBackoffMetricsEnum metricName,
+                                     RetryValue retryCount,
+                                     long denominator) {
+    return format(DOUBLE_PRECISION_FORMAT, (double) getMetricValue(metricName, retryCount) / denominator);
+  }
+
+  /**
+   * Retrieves the retry metrics.
+   *
+   * @param metricBuilder the string builder to append the metrics
+   */
+  private void getRetryMetrics(StringBuilder metricBuilder) {
+    for (RetryValue retryCount : RETRY_LIST) {
+      long totalRequests = getMetricValue(TOTAL_REQUESTS, retryCount);
+      metricBuilder.append(REQUEST_COUNT)
+              .append(retryCount.getValue())
+              .append(REQUESTS)
+              .append(getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, retryCount));
+
+      if (totalRequests > 0) {
+        metricBuilder.append(MIN_MAX_AVERAGE)
+                .append(retryCount.getValue())
+                .append(REQUESTS)
+                .append(getPrecisionMetrics(MIN_BACK_OFF, retryCount, THOUSAND))
+                .append(SECONDS)
+                .append(getPrecisionMetrics(MAX_BACK_OFF, retryCount, THOUSAND))
+                .append(SECONDS)
+                .append(getPrecisionMetrics(TOTAL_BACK_OFF, retryCount, totalRequests * THOUSAND))
+                .append(SECONDS);
+      } else {
+        metricBuilder.append(MIN_MAX_AVERAGE)
+                .append(retryCount.getValue())
+                .append(REQUESTS + EQUAL + 0 + SECONDS);
+      }
+    }
+  }
+
+  /**
+   * Retrieves the base metrics.
+   *
+   * @param metricBuilder the string builder to append the metrics
+   */
+  private void getBaseMetrics(StringBuilder metricBuilder) {
+    long totalRequestsThrottled = getMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS)
+            + getMetricValue(NUMBER_OF_IOPS_THROTTLED_REQUESTS)
+            + getMetricValue(NUMBER_OF_OTHER_THROTTLED_REQUESTS)
+            + getMetricValue(NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS);
+
+    metricBuilder.append(BANDWIDTH_THROTTLED_REQUESTS)
+            .append(getMetricValue(NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS))
+            .append(IOPS_THROTTLED_REQUESTS)
+            .append(getMetricValue(NUMBER_OF_IOPS_THROTTLED_REQUESTS))
+            .append(OTHER_THROTTLED_REQUESTS)
+            .append(getMetricValue(NUMBER_OF_OTHER_THROTTLED_REQUESTS))
+            .append(PERCENTAGE_THROTTLED_REQUESTS)
+            .append(formatPercent(totalRequestsThrottled/ (double) getMetricValue(TOTAL_NUMBER_OF_REQUESTS), 3))
+            .append(NETWORK_ERROR_REQUESTS)
+            .append(getMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS))
+            .append(SUCCESS_REQUESTS_WITHOUT_RETRY)
+            .append(getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED_WITHOUT_RETRYING))
+            .append(FAILED_REQUESTS)
+            .append(getMetricValue(NUMBER_OF_REQUESTS_FAILED))
+            .append(TOTAL_REQUESTS_COUNT)
+            .append(getMetricValue(TOTAL_NUMBER_OF_REQUESTS))
+            .append(MAX_RETRY)
+            .append(getMetricValue(MAX_RETRY_COUNT));
+  }
+
+  /**
+   * Retrieves the string representation of the metrics.
+   *
+   * @return the string representation of the metrics
+   */
+  @Override
+  public String toString() {
+    if (getMetricValue(TOTAL_NUMBER_OF_REQUESTS) == 0) {
+      return EMPTY_STRING;
+    }
+    StringBuilder metricBuilder = new StringBuilder();
+    getRetryMetrics(metricBuilder);
+    getBaseMetrics(metricBuilder);
+    return metricBuilder.toString();
+  }
+
+  /**
+   * Retrieves the metric names based on the statistic type.
+   *
+   * @param type the type of the statistic (counter or gauge)
+   * @return an array of metric names
+   */
+  @VisibleForTesting
+  String[] getMetricNamesByType(StatisticTypeEnum type) {
+    return getMetricNames(type);
+  }
+}

+ 0 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java

@@ -25,7 +25,6 @@ import org.apache.hadoop.classification.VisibleForTesting;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics;
 import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.DurationTracker;

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -261,7 +261,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       // There maybe case that we read less than requested data.
       // There maybe case that we read less than requested data.
       long filePosAtStartOfBuffer = fCursor - limit;
       long filePosAtStartOfBuffer = fCursor - limit;
       if (abfsReadFooterMetrics != null) {
       if (abfsReadFooterMetrics != null) {
-        abfsReadFooterMetrics.checkMetricUpdate(filePathIdentifier, len, contentLength, nextReadPos);
+        abfsReadFooterMetrics.updateReadMetrics(filePathIdentifier, len, contentLength, nextReadPos);
       }
       }
       if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) {
       if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) {
         // Determining position in buffer from where data is to be read.
         // Determining position in buffer from where data is to be read.

+ 486 - 498
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java

@@ -17,533 +17,521 @@
  */
  */
 package org.apache.hadoop.fs.azurebfs.services;
 package org.apache.hadoop.fs.azurebfs.services;
 
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.StringUtils.format;
+
+/**
+ * This class is responsible for tracking and updating metrics related to reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+    private static final Logger LOG = LoggerFactory.getLogger(AbfsReadFooterMetrics.class);
+    private static final String FOOTER_LENGTH = "20";
+    private static final List<FileType> FILE_TYPE_LIST =
+            Arrays.asList(FileType.values());
+    private final Map<String, FileTypeMetrics> fileTypeMetricsMap =
+            new ConcurrentHashMap<>();
+
+    /**
+     * Inner class to handle file type checks.
+     */
+    private static final class FileTypeMetrics {
+        private final AtomicBoolean collectMetrics;
+        private final AtomicBoolean collectMetricsForNextRead;
+        private final AtomicBoolean collectLenMetrics;
+        private final AtomicLong readCount;
+        private final AtomicLong offsetOfFirstRead;
+        private FileType fileType = null;
+        private String sizeReadByFirstRead;
+        private String offsetDiffBetweenFirstAndSecondRead;
+
+        /**
+         * Constructor to initialize the file type metrics.
+         */
+        private FileTypeMetrics() {
+            collectMetrics = new AtomicBoolean(false);
+            collectMetricsForNextRead = new AtomicBoolean(false);
+            collectLenMetrics = new AtomicBoolean(false);
+            readCount = new AtomicLong(0);
+            offsetOfFirstRead = new AtomicLong(0);
+        }
+
+        /**
+         * Updates the file type based on the metrics collected.
+         */
+        private void updateFileType() {
+            if (fileType == null) {
+                fileType = collectMetrics.get() && readCount.get() >= 2
+                        && haveEqualValues(sizeReadByFirstRead)
+                        && haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+            }
+        }
+
+        /**
+         * Checks if the given value has equal parts.
+         *
+         * @param value the value to check
+         * @return true if the value has equal parts, false otherwise
+         */
+        private boolean haveEqualValues(String value) {
+            String[] parts = value.split("_");
+            return parts.length == 2
+                    && parts[0].equals(parts[1]);
+        }
+
+        /**
+         * Increments the read count.
+         */
+        private void incrementReadCount() {
+            readCount.incrementAndGet();
+        }
+
+        /**
+         * Returns the read count.
+         *
+         * @return the read count
+         */
+        private long getReadCount() {
+            return readCount.get();
+        }
+
+        /**
+         * Sets the collect metrics flag.
+         *
+         * @param collect the value to set
+         */
+        private void setCollectMetrics(boolean collect) {
+            collectMetrics.set(collect);
+        }
+
+        /**
+         * Returns the collect metrics flag.
+         *
+         * @return the collect metrics flag
+         */
+        private boolean getCollectMetrics() {
+            return collectMetrics.get();
+        }
+
+        /**
+         * Sets the collect metrics for the next read flag.
+         *
+         * @param collect the value to set
+         */
+        private void setCollectMetricsForNextRead(boolean collect) {
+            collectMetricsForNextRead.set(collect);
+        }
+
+        /**
+         * Returns the collect metrics for the next read flag.
+         *
+         * @return the collect metrics for the next read flag
+         */
+        private boolean getCollectMetricsForNextRead() {
+            return collectMetricsForNextRead.get();
+        }
+
+        /**
+         * Returns the collect length metrics flag.
+         *
+         * @return the collect length metrics flag
+         */
+        private boolean getCollectLenMetrics() {
+            return collectLenMetrics.get();
+        }
+
+        /**
+         * Sets the collect length metrics flag.
+         *
+         * @param collect the value to set
+         */
+        private void setCollectLenMetrics(boolean collect) {
+            collectLenMetrics.set(collect);
+        }
+
+        /**
+         * Sets the offset of the first read.
+         *
+         * @param offset the value to set
+         */
+        private void setOffsetOfFirstRead(long offset) {
+            offsetOfFirstRead.set(offset);
+        }
+
+        /**
+         * Returns the offset of the first read.
+         *
+         * @return the offset of the first read
+         */
+        private long getOffsetOfFirstRead() {
+            return offsetOfFirstRead.get();
+        }
+
+        /**
+         * Sets the size read by the first read.
+         *
+         * @param size the value to set
+         */
+        private void setSizeReadByFirstRead(String size) {
+            sizeReadByFirstRead = size;
+        }
+
+        /**
+         * Returns the size read by the first read.
+         *
+         * @return the size read by the first read
+         */
+        private String getSizeReadByFirstRead() {
+            return sizeReadByFirstRead;
+        }
 
 
-public class AbfsReadFooterMetrics {
-  private final AtomicBoolean isParquetFile;
-  private final AtomicBoolean isParquetEvaluated;
-  private final AtomicBoolean isLenUpdated;
-  private String sizeReadByFirstRead;
-  private String offsetDiffBetweenFirstAndSecondRead;
-  private final AtomicLong fileLength;
-  private double avgFileLength;
-  private double avgReadLenRequested;
-  private final AtomicBoolean collectMetrics;
-  private final AtomicBoolean collectMetricsForNextRead;
-  private final AtomicBoolean collectLenMetrics;
-  private final AtomicLong dataLenRequested;
-  private final AtomicLong offsetOfFirstRead;
-  private final AtomicInteger readCount;
-  private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics> metricsMap;
-  private static final String FOOTER_LENGTH = "20";
-
-  public AbfsReadFooterMetrics() {
-    this.isParquetFile = new AtomicBoolean(false);
-    this.isParquetEvaluated = new AtomicBoolean(false);
-    this.isLenUpdated = new AtomicBoolean(false);
-    this.fileLength = new AtomicLong();
-    this.readCount = new AtomicInteger(0);
-    this.offsetOfFirstRead = new AtomicLong();
-    this.collectMetrics = new AtomicBoolean(false);
-    this.collectMetricsForNextRead = new AtomicBoolean(false);
-    this.collectLenMetrics = new AtomicBoolean(false);
-    this.dataLenRequested = new AtomicLong(0);
-    this.metricsMap = new ConcurrentSkipListMap<>();
-  }
-
-  public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
-    return metricsMap;
-  }
-
-  private boolean getIsParquetFile() {
-    return isParquetFile.get();
-  }
-
-  public void setIsParquetFile(boolean isParquetFile) {
-    this.isParquetFile.set(isParquetFile);
-  }
-
-  private String getSizeReadByFirstRead() {
-    return sizeReadByFirstRead;
-  }
-
-  public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
-    this.sizeReadByFirstRead = sizeReadByFirstRead;
-  }
-
-  private String getOffsetDiffBetweenFirstAndSecondRead() {
-    return offsetDiffBetweenFirstAndSecondRead;
-  }
-
-  public void setOffsetDiffBetweenFirstAndSecondRead(final String offsetDiffBetweenFirstAndSecondRead) {
-    this.offsetDiffBetweenFirstAndSecondRead
-        = offsetDiffBetweenFirstAndSecondRead;
-  }
-
-  private long getFileLength() {
-    return fileLength.get();
-  }
-
-  private void setFileLength(long fileLength) {
-    this.fileLength.set(fileLength);
-  }
-
-  private double getAvgFileLength() {
-    return avgFileLength;
-  }
-
-  public void setAvgFileLength(final double avgFileLength) {
-    this.avgFileLength = avgFileLength;
-  }
-
-  private double getAvgReadLenRequested() {
-    return avgReadLenRequested;
-  }
-
-  public void setAvgReadLenRequested(final double avgReadLenRequested) {
-    this.avgReadLenRequested = avgReadLenRequested;
-  }
-
-  private boolean getCollectMetricsForNextRead() {
-    return collectMetricsForNextRead.get();
-  }
-
-  private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead) {
-    this.collectMetricsForNextRead.set(collectMetricsForNextRead);
-  }
-
-  private long getOffsetOfFirstRead() {
-    return offsetOfFirstRead.get();
-  }
-
-  private void setOffsetOfFirstRead(long offsetOfFirstRead) {
-    this.offsetOfFirstRead.set(offsetOfFirstRead);
-  }
-
-  private int getReadCount() {
-    return readCount.get();
-  }
-
-  private void setReadCount(int readCount) {
-    this.readCount.set(readCount);
-  }
-
-  private int incrementReadCount() {
-    this.readCount.incrementAndGet();
-    return getReadCount();
-  }
-
-  private boolean getCollectLenMetrics() {
-    return collectLenMetrics.get();
-  }
-
-  private void setCollectLenMetrics(boolean collectLenMetrics) {
-    this.collectLenMetrics.set(collectLenMetrics);
-
-  }
-
-  private long getDataLenRequested() {
-    return dataLenRequested.get();
-  }
-
-  private void setDataLenRequested(long dataLenRequested) {
-    this.dataLenRequested.set(dataLenRequested);
-  }
-
-  private void updateDataLenRequested(long dataLenRequested){
-    this.dataLenRequested.addAndGet(dataLenRequested);
-  }
-
-  private boolean getCollectMetrics() {
-    return collectMetrics.get();
-  }
-
-  private void setCollectMetrics(boolean collectMetrics) {
-    this.collectMetrics.set(collectMetrics);
-  }
-
-  private boolean getIsParquetEvaluated() {
-    return isParquetEvaluated.get();
-  }
-
-  private void setIsParquetEvaluated(boolean isParquetEvaluated) {
-    this.isParquetEvaluated.set(isParquetEvaluated);
-  }
-
-  private boolean getIsLenUpdated() {
-    return isLenUpdated.get();
-  }
-
-  private void setIsLenUpdated(boolean isLenUpdated) {
-    this.isLenUpdated.set(isLenUpdated);
-  }
-
-  /**
-   * Updates the metrics map with an entry for the specified file if it doesn't already exist.
-   *
-   * @param filePathIdentifier The unique identifier for the file.
-   */
-  public void updateMap(String filePathIdentifier) {
-    // If the file is not already in the metrics map, add it with a new AbfsReadFooterMetrics object.
-    metricsMap.computeIfAbsent(filePathIdentifier, key -> new AbfsReadFooterMetrics());
-  }
-
-  /**
-   * Checks and updates metrics for a specific file identified by filePathIdentifier.
-   * If the metrics do not exist for the file, they are initialized.
-   *
-   * @param filePathIdentifier The unique identifier for the file.
-   * @param len               The length of the read operation.
-   * @param contentLength     The total content length of the file.
-   * @param nextReadPos       The position of the next read operation.
-   */
-  public void checkMetricUpdate(final String filePathIdentifier, final int len, final long contentLength,
-      final long nextReadPos) {
-    AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
-            filePathIdentifier, key -> new AbfsReadFooterMetrics());
-    if (readFooterMetrics.getReadCount() == 0
-        || (readFooterMetrics.getReadCount() >= 1
-        && readFooterMetrics.getCollectMetrics())) {
-      updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+        /**
+         * Sets the offset difference between the first and second read.
+         *
+         * @param offsetDiff the value to set
+         */
+        private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff) {
+            offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+        }
+
+        /**
+         * Returns the offset difference between the first and second read.
+         *
+         * @return the offset difference between the first and second read
+         */
+        private String getOffsetDiffBetweenFirstAndSecondRead() {
+            return offsetDiffBetweenFirstAndSecondRead;
+        }
+
+        /**
+         * Returns the file type.
+         *
+         * @return the file type
+         */
+        private FileType getFileType() {
+            return fileType;
+        }
     }
     }
-  }
-
-  /**
-   * Updates metrics for a specific file identified by filePathIdentifier.
-   *
-   * @param filePathIdentifier  The unique identifier for the file.
-   * @param len                The length of the read operation.
-   * @param contentLength      The total content length of the file.
-   * @param nextReadPos        The position of the next read operation.
-   */
-  private void updateMetrics(final String filePathIdentifier, final int len, final long contentLength,
-                             final long nextReadPos) {
-    AbfsReadFooterMetrics readFooterMetrics = metricsMap.get(filePathIdentifier);
-
-    // Create a new AbfsReadFooterMetrics object if it doesn't exist in the metricsMap.
-    if (readFooterMetrics == null) {
-      readFooterMetrics = new AbfsReadFooterMetrics();
-      metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+    /**
+     * Constructor to initialize the IOStatisticsStore with counters and mean statistics.
+     */
+    public AbfsReadFooterMetrics() {
+        IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+                .withCounters(getMetricNames(TYPE_COUNTER))
+                .withMeanStatistics(getMetricNames(TYPE_MEAN))
+                .build();
+        setIOStatistics(ioStatisticsStore);
     }
     }
 
 
-    int readCount;
-    synchronized (this) {
-      readCount = readFooterMetrics.incrementReadCount();
+    /**
+     * Returns the metric names for a specific statistic type.
+     *
+     * @param type the statistic type
+     * @return the metric names
+     */
+    private String[] getMetricNames(StatisticTypeEnum type) {
+        return Arrays.stream(AbfsReadFooterMetricsEnum.values())
+                .filter(readFooterMetricsEnum -> readFooterMetricsEnum.getStatisticType().equals(type))
+                .flatMap(readFooterMetricsEnum ->
+                        FILE.equals(readFooterMetricsEnum.getType())
+                                ? FILE_TYPE_LIST.stream().map(fileType ->
+                                getMetricName(fileType, readFooterMetricsEnum))
+                                : Stream.of(readFooterMetricsEnum.getName()))
+                .toArray(String[]::new);
     }
     }
 
 
-    if (readCount == 1) {
-      // Update metrics for the first read.
-      updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, contentLength);
+    /**
+     * Returns the metric name for a specific file type and metric.
+     *
+     * @param fileType the type of the file
+     * @param readFooterMetricsEnum the metric to get the name for
+     * @return the metric name
+     */
+    private String getMetricName(FileType fileType,
+                                 AbfsReadFooterMetricsEnum readFooterMetricsEnum) {
+        if (fileType == null || readFooterMetricsEnum == null) {
+            LOG.error("File type or ABFS read footer metrics should not be null");
+            return EMPTY_STRING;
+        }
+        return fileType + COLON + readFooterMetricsEnum.getName();
     }
     }
 
 
-    synchronized (this) {
-      if (readFooterMetrics.getCollectLenMetrics()) {
-        readFooterMetrics.updateDataLenRequested(len);
-      }
+    /**
+     * Looks up the counter value for a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param abfsReadFooterMetricsEnum the metric to look up
+     * @return the counter value
+     */
+    private long getCounterMetricValue(FileType fileType,
+                                       AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
+        return lookupCounterValue(getMetricName(fileType, abfsReadFooterMetricsEnum));
     }
     }
 
 
-    if (readCount == 2) {
-      // Update metrics for the second read.
-      updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+    /**
+     * Looks up the mean statistic value for a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param abfsReadFooterMetricsEnum the metric to look up
+     * @return the mean statistic value
+     */
+    private String getMeanMetricValue(FileType fileType,
+                                      AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
+        return format(DOUBLE_PRECISION_FORMAT,
+                lookupMeanStatistic(getMetricName(fileType, abfsReadFooterMetricsEnum)));
     }
     }
-  }
-
-  /**
-   * Updates metrics for the first read operation.
-   *
-   * @param readFooterMetrics The metrics object to update.
-   * @param nextReadPos       The position of the next read operation.
-   * @param len               The length of the read operation.
-   * @param contentLength     The total content length of the file.
-   */
-  private void updateMetricsOnFirstRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len, long contentLength) {
-    if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
-      readFooterMetrics.setCollectMetrics(true);
-      readFooterMetrics.setCollectMetricsForNextRead(true);
-      readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
-      readFooterMetrics.setSizeReadByFirstRead(len + "_" + Math.abs(contentLength - nextReadPos));
-      readFooterMetrics.setFileLength(contentLength);
+
+    /**
+     * Increments the value of a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param abfsReadFooterMetricsEnum the metric to increment
+     */
+    public void incrementMetricValue(FileType fileType,
+                                     AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
+        incCounterValue(getMetricName(fileType, abfsReadFooterMetricsEnum));
     }
     }
-  }
-
-  /**
-   * Updates metrics for the second read operation.
-   *
-   * @param readFooterMetrics The metrics object to update.
-   * @param nextReadPos       The position of the next read operation.
-   * @param len               The length of the read operation.
-   */
-  private void updateMetricsOnSecondRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len) {
-    if (readFooterMetrics.getCollectMetricsForNextRead()) {
-      long offsetDiff = Math.abs(nextReadPos - readFooterMetrics.getOffsetOfFirstRead());
-      readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + offsetDiff);
-      readFooterMetrics.setCollectLenMetrics(true);
+
+    /**
+     * Adds a mean statistic value for a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param abfsReadFooterMetricsEnum the metric to update
+     * @param value the new value of the metric
+     */
+    public void addMeanMetricValue(FileType fileType,
+                                   AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum,
+                                   long value) {
+        addMeanStatistic(getMetricName(fileType, abfsReadFooterMetricsEnum), value);
     }
     }
-  }
-
-
-  /**
-   * Check if the given file should be marked as a Parquet file.
-   *
-   * @param metrics The metrics to evaluate.
-   * @return True if the file meet the criteria for being marked as a Parquet file, false otherwise.
-   */
-  private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
-    return metrics.getCollectMetrics()
-            && metrics.getReadCount() >= 2
-            && !metrics.getIsParquetEvaluated()
-            && haveEqualValues(metrics.getSizeReadByFirstRead())
-            && haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
-  }
-
-  /**
-   * Check if two values are equal, considering they are in the format "value1_value2".
-   *
-   * @param value The value to check.
-   * @return True if the two parts of the value are equal, false otherwise.
-   */
-  private boolean haveEqualValues(String value) {
-    String[] parts = value.split("_");
-    return parts.length == 2 && parts[0].equals(parts[1]);
-  }
-
-  /**
-   * Mark the given metrics as a Parquet file and update related values.
-   *
-   * @param metrics The metrics to mark as Parquet.
-   */
-  private void markAsParquet(AbfsReadFooterMetrics metrics) {
-    metrics.setIsParquetFile(true);
-    String[] parts = metrics.getSizeReadByFirstRead().split("_");
-    metrics.setSizeReadByFirstRead(parts[0]);
-    parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-    metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
-    metrics.setIsParquetEvaluated(true);
-  }
-
-  /**
-   * Check each metric in the provided map and mark them as Parquet files if they meet the criteria.
-   *
-   * @param metricsMap The map containing metrics to evaluate.
-   */
-  public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
-    for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) {
-      AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
-      if (shouldMarkAsParquet(readFooterMetrics)) {
-        markAsParquet(readFooterMetrics);
-        metricsMap.replace(entry.getKey(), readFooterMetrics);
-      }
+
+    /**
+     * Returns the total number of files.
+     *
+     * @return the total number of files
+     */
+    public Long getTotalFiles() {
+        return getCounterMetricValue(PARQUET, TOTAL_FILES) + getCounterMetricValue(NON_PARQUET, TOTAL_FILES);
     }
     }
-  }
-
-  /**
-   * Updates the average read length requested for metrics of all files in the metrics map.
-   * If the metrics indicate that the update is needed, it calculates the average read length and updates the metrics.
-   *
-   * @param metricsMap A map containing metrics for different files with unique identifiers.
-   */
-  private void updateLenRequested(Map<String, AbfsReadFooterMetrics> metricsMap) {
-    for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
-      if (shouldUpdateLenRequested(readFooterMetrics)) {
-        int readReqCount = readFooterMetrics.getReadCount() - 2;
-        readFooterMetrics.setAvgReadLenRequested(
-                (double) readFooterMetrics.getDataLenRequested() / readReqCount);
-        readFooterMetrics.setIsLenUpdated(true);
-      }
+
+    /**
+     * Updates the map with a new file path identifier.
+     *
+     * @param filePathIdentifier the file path identifier
+     */
+    public void updateMap(String filePathIdentifier) {
+        fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new FileTypeMetrics());
     }
     }
-  }
-
-  /**
-   * Checks whether the average read length requested should be updated for the given metrics.
-   *
-   * The method returns true if the following conditions are met:
-   * - Metrics collection is enabled.
-   * - The number of read counts is greater than 2.
-   * - The average read length has not been updated previously.
-   *
-   * @param readFooterMetrics The metrics object to evaluate.
-   * @return True if the average read length should be updated, false otherwise.
-   */
-  private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics readFooterMetrics) {
-    return readFooterMetrics.getCollectMetrics()
-            && readFooterMetrics.getReadCount() > 2
-            && !readFooterMetrics.getIsLenUpdated();
-  }
-
-  /**
-   * Calculates the average metrics from a list of AbfsReadFooterMetrics and sets the values in the provided 'avgParquetReadFooterMetrics' object.
-   *
-   * @param isParquetList The list of AbfsReadFooterMetrics to compute the averages from.
-   * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics object to store the computed average values.
-   *
-   * This method calculates various average metrics from the provided list and sets them in the 'avgParquetReadFooterMetrics' object.
-   * The metrics include:
-   * - Size read by the first read
-   * - Offset difference between the first and second read
-   * - Average file length
-   * - Average requested read length
-   */
-  private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isParquetList,
-      AbfsReadFooterMetrics avgParquetReadFooterMetrics){
-    avgParquetReadFooterMetrics.setSizeReadByFirstRead(
-        String.format("%.3f", isParquetList.stream()
-            .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
-                Double::parseDouble).average().orElse(0.0)));
-    avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
-        String.format("%.3f", isParquetList.stream()
-            .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
-            .mapToDouble(Double::parseDouble).average().orElse(0.0)));
-    avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
-        .mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
-    avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
-        map(AbfsReadFooterMetrics::getAvgReadLenRequested).
-        mapToDouble(Double::doubleValue).average().orElse(0.0));
-  }
-
-  /**
-   * Calculates the average metrics from a list of non-Parquet AbfsReadFooterMetrics instances.
-   *
-   * This method takes a list of AbfsReadFooterMetrics representing non-Parquet reads and calculates
-   * the average values for the size read by the first read and the offset difference between the first
-   * and second read. The averages are then set in the provided AbfsReadFooterMetrics instance.
-   *
-   * @param isNonParquetList A list of AbfsReadFooterMetrics instances representing non-Parquet reads.
-   * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance to store the calculated averages.
-   *                                      It is assumed that the size of the list is at least 1, and the first
-   *                                      element of the list is used to determine the size of arrays.
-   *                                      The instance is modified in-place with the calculated averages.
-   *
-   *
-   **/
-  private void getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isNonParquetList,
-                                                     AbfsReadFooterMetrics avgNonParquetReadFooterMetrics) {
-    int size = isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
-    double[] store = new double[2 * size];
-    // Calculating sum of individual values
-    isNonParquetList.forEach(abfsReadFooterMetrics -> {
-      String[] firstReadSize = abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
-      String[] offDiffFirstSecondRead = abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-
-      for (int i = 0; i < firstReadSize.length; i++) {
-        store[i] += Long.parseLong(firstReadSize[i]);
-        store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
-      }
-    });
-
-    // Calculating averages and creating formatted strings
-    StringJoiner firstReadSize = new StringJoiner("_");
-    StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
-
-    for (int j = 0; j < size; j++) {
-      firstReadSize.add(String.format("%.3f", store[j] / isNonParquetList.size()));
-      offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / isNonParquetList.size()));
+
+    /**
+     * Checks and updates the metrics for a given file read.
+     *
+     * @param filePathIdentifier the file path identifier
+     * @param len the length of the read
+     * @param contentLength the total content length of the file
+     * @param nextReadPos the position of the next read
+     */
+    public void updateReadMetrics(final String filePathIdentifier,
+                                  final int len,
+                                  final long contentLength,
+                                  final long nextReadPos) {
+        FileTypeMetrics fileTypeMetrics = fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new FileTypeMetrics());
+        if (fileTypeMetrics.getReadCount() == 0 || (fileTypeMetrics.getReadCount() >= 1 && fileTypeMetrics.getCollectMetrics())) {
+            updateMetrics(fileTypeMetrics, len, contentLength, nextReadPos);
+        }
     }
     }
 
 
-    avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
-    avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
-    avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
-            .mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
-    avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
-            .mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
-  }
-
-  /*
-  Acronyms:
-  1.FR :- First Read (In case of parquet we only maintain the size requested by application for
-  the first read, in case of non parquet we maintain a string separated by "_" delimiter where the first
-  substring represents the len requested for first read and the second substring represents the seek pointer difference from the
-  end of the file.)
-  2.SR :- Second Read (In case of parquet we only maintain the size requested by application for
-  the second read, in case of non parquet we maintain a string separated by "_" delimiter where the first
-  substring represents the len requested for second read and the second substring represents the seek pointer difference from the
-  offset of the first read.)
-  3.FL :- Total length of the file requested for read
-   */
-  public String getReadFooterMetrics(AbfsReadFooterMetrics avgReadFooterMetrics) {
-    String readFooterMetric = "";
-    if (avgReadFooterMetrics.getIsParquetFile()) {
-      readFooterMetric += "$Parquet:";
-    } else {
-      readFooterMetric += "$NonParquet:";
+    /**
+     * Updates metrics for a specific file identified by filePathIdentifier.
+     *
+     * @param fileTypeMetrics    File metadata to know file type.
+     * @param len                The length of the read operation.
+     * @param contentLength      The total content length of the file.
+     * @param nextReadPos        The position of the next read operation.
+     */
+    private void updateMetrics(FileTypeMetrics fileTypeMetrics,
+                               int len,
+                               long contentLength,
+                               long nextReadPos) {
+        fileTypeMetrics.incrementReadCount();
+
+        long readCount = fileTypeMetrics.getReadCount();
+
+        if (readCount == 1) {
+            handleFirstRead(fileTypeMetrics, nextReadPos, len, contentLength);
+        } else if (readCount == 2) {
+            handleSecondRead(fileTypeMetrics, nextReadPos, len, contentLength);
+        } else {
+            handleFurtherRead(fileTypeMetrics, len);
+        }
     }
     }
-    readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
-        + "$SR="
-        + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
-        + "$FL=" + String.format("%.3f",
-        avgReadFooterMetrics.getAvgFileLength())
-        + "$RL=" + String.format("%.3f",
-        avgReadFooterMetrics.getAvgReadLenRequested());
-    return readFooterMetric;
-  }
 
 
-/**
- * Retrieves and aggregates read footer metrics for both Parquet and non-Parquet files from a list
- * of AbfsReadFooterMetrics instances. The function calculates the average metrics separately for
- * Parquet and non-Parquet files and returns a formatted string containing the aggregated metrics.
- *
- * @param readFooterMetricsList A list of AbfsReadFooterMetrics instances containing read footer metrics
- *                              for both Parquet and non-Parquet files.
- *
- * @return A formatted string containing the aggregated read footer metrics for both Parquet and non-Parquet files.
- *
- **/
-private String getFooterMetrics(List<AbfsReadFooterMetrics> readFooterMetricsList) {
-  List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
-  List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
-  for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
-    if (abfsReadFooterMetrics.getIsParquetFile()) {
-      isParquetList.add(abfsReadFooterMetrics);
-    } else {
-      if (abfsReadFooterMetrics.getReadCount() >= 2) {
-        isNonParquetList.add(abfsReadFooterMetrics);
-      }
+    /**
+     * Handles the first read operation by checking if the current read position is near the end of the file.
+     * If it is, updates the {@link FileTypeMetrics} object to enable metrics collection and records the first read's
+     * offset and size.
+     *
+     * @param fileTypeMetrics The {@link FileTypeMetrics} object to update with metrics and read details.
+     * @param nextReadPos The position where the next read will start.
+     * @param len The length of the current read operation.
+     * @param contentLength The total length of the file content.
+     */
+    private void handleFirstRead(FileTypeMetrics fileTypeMetrics,
+                                 long nextReadPos,
+                                 int len,
+                                 long contentLength) {
+        if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
+            fileTypeMetrics.setCollectMetrics(true);
+            fileTypeMetrics.setCollectMetricsForNextRead(true);
+            fileTypeMetrics.setOffsetOfFirstRead(nextReadPos);
+            fileTypeMetrics.setSizeReadByFirstRead(len + "_" + Math.abs(contentLength - nextReadPos));
+        }
     }
     }
-  }
-  AbfsReadFooterMetrics avgParquetReadFooterMetrics = new AbfsReadFooterMetrics();
-  AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new AbfsReadFooterMetrics();
-  String readFooterMetric = "";
-  if (!isParquetList.isEmpty()) {
-    avgParquetReadFooterMetrics.setIsParquetFile(true);
-    getParquetReadFooterMetricsAverage(isParquetList, avgParquetReadFooterMetrics);
-    readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
-  }
-  if (!isNonParquetList.isEmpty()) {
-    avgNonparquetReadFooterMetrics.setIsParquetFile(false);
-    getNonParquetReadFooterMetricsAverage(isNonParquetList, avgNonparquetReadFooterMetrics);
-    readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
-  }
-  return readFooterMetric;
-}
 
 
+    /**
+     * Handles the second read operation by checking if metrics collection is enabled for the next read.
+     * If it is, calculates the offset difference between the first and second reads, updates the {@link FileTypeMetrics}
+     * object with this information, and sets the file type. Then, updates the metrics data.
+     *
+     * @param fileTypeMetrics The {@link FileTypeMetrics} object to update with metrics and read details.
+     * @param nextReadPos The position where the next read will start.
+     * @param len The length of the current read operation.
+     * @param contentLength The total length of the file content.
+     */
+    private void handleSecondRead(FileTypeMetrics fileTypeMetrics,
+                                  long nextReadPos,
+                                  int len,
+                                  long contentLength) {
+        if (fileTypeMetrics.getCollectMetricsForNextRead()) {
+            long offsetDiff = Math.abs(nextReadPos - fileTypeMetrics.getOffsetOfFirstRead());
+            fileTypeMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + offsetDiff);
+            fileTypeMetrics.setCollectLenMetrics(true);
+            fileTypeMetrics.updateFileType();
+            updateMetricsData(fileTypeMetrics, len, contentLength);
+        }
+    }
 
 
-  @Override
-  public String toString() {
-    Map<String, AbfsReadFooterMetrics> metricsMap = getMetricsMap();
-    List<AbfsReadFooterMetrics> readFooterMetricsList = new ArrayList<>();
-    if (metricsMap != null && !(metricsMap.isEmpty())) {
-      checkIsParquet(metricsMap);
-      updateLenRequested(metricsMap);
-      for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) {
-        AbfsReadFooterMetrics abfsReadFooterMetrics = entry.getValue();
-        if (abfsReadFooterMetrics.getCollectMetrics()) {
-          readFooterMetricsList.add(entry.getValue());
+    /**
+     * Handles further read operations beyond the second read. If metrics collection is enabled and the file type is set,
+     * updates the read length requested and increments the read count for the specific file type.
+     *
+     * @param fileTypeMetrics The {@link FileTypeMetrics} object containing metrics and read details.
+     * @param len The length of the current read operation.
+     */
+    private void handleFurtherRead(FileTypeMetrics fileTypeMetrics, int len) {
+        if (fileTypeMetrics.getCollectLenMetrics() && fileTypeMetrics.getFileType() != null) {
+            FileType fileType = fileTypeMetrics.getFileType();
+            addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, len);
         }
         }
-      }
     }
     }
-    String readFooterMetrics = "";
-    if (!readFooterMetricsList.isEmpty()) {
-      readFooterMetrics = getFooterMetrics(readFooterMetricsList);
+
+    /**
+     * Updates the metrics data for a specific file identified by the {@link FileTypeMetrics} object.
+     * This method calculates and updates various metrics such as read length requested, file length,
+     * size read by the first read, and offset differences between reads.
+     *
+     * @param fileTypeMetrics The {@link FileTypeMetrics} object containing metrics and read details.
+     * @param len The length of the current read operation.
+     * @param contentLength The total length of the file content.
+     */
+    private void updateMetricsData(FileTypeMetrics fileTypeMetrics,
+                                                int len,
+                                                long contentLength) {
+        long sizeReadByFirstRead = Long.parseLong(fileTypeMetrics.getSizeReadByFirstRead().split("_")[0]);
+        long firstOffsetDiff = Long.parseLong(fileTypeMetrics.getSizeReadByFirstRead().split("_")[1]);
+        long secondOffsetDiff = Long.parseLong(fileTypeMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_")[1]);
+        FileType fileType = fileTypeMetrics.getFileType();
+
+        addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, len);
+        addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, sizeReadByFirstRead);
+        addMeanMetricValue(fileType, AVG_FILE_LENGTH, contentLength);
+        addMeanMetricValue(fileType, AVG_SIZE_READ_BY_FIRST_READ, sizeReadByFirstRead);
+        addMeanMetricValue(fileType, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ, len);
+        addMeanMetricValue(fileType, AVG_FIRST_OFFSET_DIFF, firstOffsetDiff);
+        addMeanMetricValue(fileType, AVG_SECOND_OFFSET_DIFF, secondOffsetDiff);
+        incrementMetricValue(fileType, TOTAL_FILES);
     }
     }
-    return readFooterMetrics;
-  }
-}
 
 
+    /**
+     * Appends the metrics for a specific file type to the given metric builder.
+     *
+     * @param metricBuilder the metric builder to append the metrics to
+     * @param fileType the file type to append the metrics for
+     */
+    private void appendMetrics(StringBuilder metricBuilder, FileType fileType) {
+        long totalFiles = getCounterMetricValue(fileType, TOTAL_FILES);
+        if (totalFiles <= 0) {
+            return;
+        }
+
+        String sizeReadByFirstRead = getMeanMetricValue(fileType, AVG_SIZE_READ_BY_FIRST_READ);
+        String offsetDiffBetweenFirstAndSecondRead = getMeanMetricValue(fileType, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ);
+
+        if (NON_PARQUET.equals(fileType)) {
+            sizeReadByFirstRead += CHAR_UNDERSCORE + getMeanMetricValue(fileType, AVG_FIRST_OFFSET_DIFF);
+            offsetDiffBetweenFirstAndSecondRead += CHAR_UNDERSCORE + getMeanMetricValue(fileType, AVG_SECOND_OFFSET_DIFF);
+        }
+
+        metricBuilder.append(CHAR_DOLLAR)
+                .append(fileType)
+                .append(FIRST_READ)
+                .append(sizeReadByFirstRead)
+                .append(SECOND_READ)
+                .append(offsetDiffBetweenFirstAndSecondRead)
+                .append(FILE_LENGTH)
+                .append(getMeanMetricValue(fileType, AVG_FILE_LENGTH))
+                .append(READ_LENGTH)
+                .append(getMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED));
+    }
+
+    /**
+     * Returns the read footer metrics for all file types.
+     *
+     * @return the read footer metrics as a string
+     */
+    @Override
+    public String toString() {
+        StringBuilder readFooterMetric = new StringBuilder();
+        appendMetrics(readFooterMetric, PARQUET);
+        appendMetrics(readFooterMetric, NON_PARQUET);
+        return readFooterMetric.toString();
+    }
+}

+ 34 - 48
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java

@@ -44,12 +44,25 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import java.util.Map;
-import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics;
+import org.apache.hadoop.fs.azurebfs.enums.RetryValue;
 import org.apache.http.impl.execchain.RequestAbortedException;
 import org.apache.http.impl.execchain.RequestAbortedException;
 
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PUT_BLOCK_LIST;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PUT_BLOCK_LIST;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MAX_RETRY_COUNT;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_FAILED;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_SUCCEEDED;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_SUCCEEDED_WITHOUT_RETRYING;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_IOPS_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_OTHER_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_NETWORK_FAILED_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MIN_BACK_OFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MAX_BACK_OFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_BACK_OFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.RetryValue.getRetryValue;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.now;
 
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
@@ -95,7 +108,6 @@ public class AbfsRestOperation {
   private AbfsHttpOperation result;
   private AbfsHttpOperation result;
   private final AbfsCounters abfsCounters;
   private final AbfsCounters abfsCounters;
   private AbfsBackoffMetrics abfsBackoffMetrics;
   private AbfsBackoffMetrics abfsBackoffMetrics;
-  private Map<String, AbfsBackoffMetrics> metricsMap;
   /**
   /**
    * This variable contains the reason of last API call within the same
    * This variable contains the reason of last API call within the same
    * AbfsRestOperation object.
    * AbfsRestOperation object.
@@ -219,9 +231,6 @@ public class AbfsRestOperation {
     if (abfsCounters != null) {
     if (abfsCounters != null) {
       this.abfsBackoffMetrics = abfsCounters.getAbfsBackoffMetrics();
       this.abfsBackoffMetrics = abfsCounters.getAbfsBackoffMetrics();
     }
     }
-    if (abfsBackoffMetrics != null) {
-      this.metricsMap = abfsBackoffMetrics.getMetricsMap();
-    }
     this.maxIoRetries = abfsConfiguration.getMaxIoRetries();
     this.maxIoRetries = abfsConfiguration.getMaxIoRetries();
     this.intercept = client.getIntercept();
     this.intercept = client.getIntercept();
     this.abfsConfiguration = abfsConfiguration;
     this.abfsConfiguration = abfsConfiguration;
@@ -308,7 +317,7 @@ public class AbfsRestOperation {
     long sleepDuration = 0L;
     long sleepDuration = 0L;
     if (abfsBackoffMetrics != null) {
     if (abfsBackoffMetrics != null) {
       synchronized (this) {
       synchronized (this) {
-        abfsBackoffMetrics.incrementTotalNumberOfRequests();
+        abfsBackoffMetrics.incrementMetricValue(TOTAL_NUMBER_OF_REQUESTS);
       }
       }
     }
     }
     while (!executeHttpOperation(retryCount, tracingContext)) {
     while (!executeHttpOperation(retryCount, tracingContext)) {
@@ -354,17 +363,17 @@ public class AbfsRestOperation {
               || statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
               || statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
         synchronized (this) {
         synchronized (this) {
           if (retryCount >= maxIoRetries) {
           if (retryCount >= maxIoRetries) {
-            abfsBackoffMetrics.incrementNumberOfRequestsFailed();
+            abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_REQUESTS_FAILED);
           }
           }
         }
         }
       } else {
       } else {
         synchronized (this) {
         synchronized (this) {
           if (retryCount > ZERO && retryCount <= maxIoRetries) {
           if (retryCount > ZERO && retryCount <= maxIoRetries) {
-            maxRetryCount = Math.max(abfsBackoffMetrics.getMaxRetryCount(), retryCount);
-            abfsBackoffMetrics.setMaxRetryCount(maxRetryCount);
+            maxRetryCount = Math.max(abfsBackoffMetrics.getMetricValue(MAX_RETRY_COUNT), retryCount);
+            abfsBackoffMetrics.setMetricValue(MAX_RETRY_COUNT, maxRetryCount);
             updateCount(retryCount);
             updateCount(retryCount);
           } else {
           } else {
-            abfsBackoffMetrics.incrementNumberOfRequestsSucceededWithoutRetrying();
+            abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED_WITHOUT_RETRYING);
           }
           }
         }
         }
       }
       }
@@ -432,12 +441,12 @@ public class AbfsRestOperation {
                     AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT)
                     AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT)
                     || serviceErrorCode.equals(
                     || serviceErrorCode.equals(
                     AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT)) {
                     AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT)) {
-              abfsBackoffMetrics.incrementNumberOfBandwidthThrottledRequests();
+              abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS);
             } else if (serviceErrorCode.equals(
             } else if (serviceErrorCode.equals(
                     AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT)) {
                     AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT)) {
-              abfsBackoffMetrics.incrementNumberOfIOPSThrottledRequests();
+              abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_IOPS_THROTTLED_REQUESTS);
             } else {
             } else {
-              abfsBackoffMetrics.incrementNumberOfOtherThrottledRequests();
+              abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_OTHER_THROTTLED_REQUESTS);
             }
             }
           }
           }
         }
         }
@@ -483,7 +492,7 @@ public class AbfsRestOperation {
       }
       }
       if (abfsBackoffMetrics != null) {
       if (abfsBackoffMetrics != null) {
         synchronized (this) {
         synchronized (this) {
-          abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests();
+          abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS);
         }
         }
       }
       }
       if (!retryPolicy.shouldRetry(retryCount, -1)) {
       if (!retryPolicy.shouldRetry(retryCount, -1)) {
@@ -498,7 +507,7 @@ public class AbfsRestOperation {
       }
       }
       if (abfsBackoffMetrics != null) {
       if (abfsBackoffMetrics != null) {
         synchronized (this) {
         synchronized (this) {
-          abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests();
+          abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS);
         }
         }
       }
       }
       failureReason = RetryReason.getAbbreviation(ex, -1, "");
       failureReason = RetryReason.getAbbreviation(ex, -1, "");
@@ -627,8 +636,7 @@ public class AbfsRestOperation {
    * This method increments the number of succeeded requests for the specified retry count.
    * This method increments the number of succeeded requests for the specified retry count.
    */
    */
   private void updateCount(int retryCount){
   private void updateCount(int retryCount){
-      String retryCounter = getKey(retryCount);
-      metricsMap.get(retryCounter).incrementNumberOfRequestsSucceeded();
+      abfsBackoffMetrics.incrementMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, getRetryValue(retryCount));
   }
   }
 
 
   /**
   /**
@@ -641,36 +649,14 @@ public class AbfsRestOperation {
    */
    */
   private void updateBackoffTimeMetrics(int retryCount, long sleepDuration) {
   private void updateBackoffTimeMetrics(int retryCount, long sleepDuration) {
     synchronized (this) {
     synchronized (this) {
-      String retryCounter = getKey(retryCount);
-      AbfsBackoffMetrics abfsBackoffMetrics = metricsMap.get(retryCounter);
-      long minBackoffTime = Math.min(abfsBackoffMetrics.getMinBackoff(), sleepDuration);
-      long maxBackoffForTime = Math.max(abfsBackoffMetrics.getMaxBackoff(), sleepDuration);
-      long totalBackoffTime = abfsBackoffMetrics.getTotalBackoff() + sleepDuration;
-      abfsBackoffMetrics.incrementTotalRequests();
-      abfsBackoffMetrics.setMinBackoff(minBackoffTime);
-      abfsBackoffMetrics.setMaxBackoff(maxBackoffForTime);
-      abfsBackoffMetrics.setTotalBackoff(totalBackoffTime);
-      metricsMap.put(retryCounter, abfsBackoffMetrics);
-    }
-  }
-
-  /**
-   * Generates a key based on the provided retry count to categorize metrics.
-   *
-   * @param retryCount The retry count used to determine the key.
-   * @return A string key representing the metrics category for the given retry count.
-   *
-   * This method categorizes retry counts into different ranges and assigns a corresponding key.
-   */
-  private String getKey(int retryCount) {
-    if (retryCount >= MIN_FIRST_RANGE && retryCount < MAX_FIRST_RANGE) {
-      return Integer.toString(retryCount);
-    } else if (retryCount >= MAX_FIRST_RANGE && retryCount < MAX_SECOND_RANGE) {
-      return "5_15";
-    } else if (retryCount >= MAX_SECOND_RANGE && retryCount < MAX_THIRD_RANGE) {
-      return "15_25";
-    } else {
-      return "25AndAbove";
+      RetryValue retryCounter = getRetryValue(retryCount);
+      long minBackoffTime = Math.min(abfsBackoffMetrics.getMetricValue(MIN_BACK_OFF, retryCounter), sleepDuration);
+      long maxBackoffForTime = Math.max(abfsBackoffMetrics.getMetricValue(MAX_BACK_OFF, retryCounter), sleepDuration);
+      long totalBackoffTime = abfsBackoffMetrics.getMetricValue(TOTAL_BACK_OFF, retryCounter) + sleepDuration;
+      abfsBackoffMetrics.incrementMetricValue(TOTAL_REQUESTS, retryCounter);
+      abfsBackoffMetrics.setMetricValue(MIN_BACK_OFF, minBackoffTime, retryCounter);
+      abfsBackoffMetrics.setMetricValue(MAX_BACK_OFF, maxBackoffForTime, retryCounter);
+      abfsBackoffMetrics.setMetricValue(TOTAL_BACK_OFF, totalBackoffTime, retryCounter);
     }
     }
   }
   }
 
 

+ 153 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsStatisticsSource.java

@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+/**
+ * Abstract class for Abfs statistics source.
+ */
+public abstract class AbstractAbfsStatisticsSource implements IOStatisticsSource {
+    private IOStatisticsStore ioStatisticsStore;
+
+    /**
+     * Default constructor.
+     */
+    protected AbstractAbfsStatisticsSource() {
+    }
+
+    /**
+     * Returns the IOStatisticsStore instance.
+     *
+     * @return the IOStatisticsStore instance
+     */
+    @Override
+    public IOStatistics getIOStatistics() {
+        return ioStatisticsStore;
+    }
+
+    /**
+     * Sets the IOStatisticsStore instance.
+     *
+     * @param ioStatisticsStore the IOStatisticsStore instance to set
+     */
+    protected void setIOStatistics(final IOStatisticsStore ioStatisticsStore) {
+        this.ioStatisticsStore = ioStatisticsStore;
+    }
+
+    /**
+     * Increments the counter value by 1 for the given name.
+     *
+     * @param name the name of the counter
+     */
+    protected void incCounterValue(String name) {
+        incCounterValue(name, 1);
+    }
+
+    /**
+     * Increments the counter value by the specified value for the given name.
+     *
+     * @param name the name of the counter
+     * @param value the value to increment by
+     */
+    protected void incCounterValue(String name, long value) {
+        ioStatisticsStore.incrementCounter(name, value);
+    }
+
+    /**
+     * Looks up the counter value for the given name.
+     *
+     * @param name the name of the counter
+     * @return the counter value
+     */
+    protected Long lookupCounterValue(String name) {
+        return ioStatisticsStore.counters().getOrDefault(name, 0L);
+    }
+
+    /**
+     * Sets the counter value for the given name.
+     *
+     * @param name the name of the counter
+     * @param value the value to set
+     */
+    protected void setCounterValue(String name, long value) {
+        ioStatisticsStore.setCounter(name, value);
+    }
+
+    /**
+     * Increments the gauge value by 1 for the given name.
+     *
+     * @param name the name of the gauge
+     */
+    protected void incGaugeValue(String name) {
+        incCounterValue(name, 1);
+    }
+
+    /**
+     * Looks up the gauge value for the given name.
+     *
+     * @param name the name of the gauge
+     * @return the gauge value
+     */
+    protected Long lookupGaugeValue(String name) {
+        return ioStatisticsStore.gauges().getOrDefault(name, 0L);
+    }
+
+    /**
+     * Sets the gauge value for the given name.
+     *
+     * @param name the name of the gauge
+     * @param value the value to set
+     */
+    protected void setGaugeValue(String name, long value) {
+        ioStatisticsStore.setGauge(name, value);
+    }
+
+    /**
+     * Add sample to mean statistics for the given name.
+     *
+     * @param name the name of the mean statistic
+     * @param value the value to set
+     */
+    protected void addMeanStatistic(String name, long value) {
+        ioStatisticsStore.addMeanStatisticSample(name, value);
+    }
+
+    /**
+     * Looks up the mean statistics value for the given name.
+     *
+     * @param name the name of the mean statistic
+     * @return the mean value
+     */
+    protected double lookupMeanStatistic(String name) {
+        return ioStatisticsStore.meanStatistics().get(name).mean();
+    }
+
+    /**
+     * Returns a string representation of the AbstractAbfsStatisticsSource.
+     *
+     * @return a string representation of the AbstractAbfsStatisticsSource
+     */
+    @Override
+    public String toString() {
+        return "AbstractAbfsStatisticsStore{" + ioStatisticsStore + '}';
+    }
+}

+ 25 - 14
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java

@@ -29,6 +29,15 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.M
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -184,7 +193,7 @@ public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
 
 
     // Get non-Parquet metrics and assert metrics equality.
     // Get non-Parquet metrics and assert metrics equality.
     AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
     AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
-    String metrics = nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
+    String metrics = nonParquetMetrics.toString();
     assertMetricsEquality(fs, metrics);
     assertMetricsEquality(fs, metrics);
 
 
     // Close the AzureBlobFileSystem.
     // Close the AzureBlobFileSystem.
@@ -196,11 +205,13 @@ public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
    */
    */
   private AbfsReadFooterMetrics getNonParquetMetrics() {
   private AbfsReadFooterMetrics getNonParquetMetrics() {
     AbfsReadFooterMetrics nonParquetMetrics = new AbfsReadFooterMetrics();
     AbfsReadFooterMetrics nonParquetMetrics = new AbfsReadFooterMetrics();
-    nonParquetMetrics.setIsParquetFile(false);
-    nonParquetMetrics.setSizeReadByFirstRead("16384.000_16384.000");
-    nonParquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("1.000_16384.000");
-    nonParquetMetrics.setAvgFileLength(Double.parseDouble("32768.000"));
-    nonParquetMetrics.setAvgReadLenRequested(Double.parseDouble("16384.000"));
+    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_FILE_LENGTH, Long.parseLong("32768"));
+    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_READ_LEN_REQUESTED, Long.parseLong("10923"));
+    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_SIZE_READ_BY_FIRST_READ, Long.parseLong("16384"));
+    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ, 1);
+    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_FIRST_OFFSET_DIFF, Long.parseLong("16384"));
+    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_SECOND_OFFSET_DIFF, Long.parseLong("16384"));
+    nonParquetMetrics.incrementMetricValue(NON_PARQUET, TOTAL_FILES);
     return nonParquetMetrics;
     return nonParquetMetrics;
   }
   }
 
 
@@ -209,11 +220,11 @@ public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
    */
    */
   private AbfsReadFooterMetrics getParquetMetrics() {
   private AbfsReadFooterMetrics getParquetMetrics() {
     AbfsReadFooterMetrics parquetMetrics = new AbfsReadFooterMetrics();
     AbfsReadFooterMetrics parquetMetrics = new AbfsReadFooterMetrics();
-    parquetMetrics.setIsParquetFile(true);
-    parquetMetrics.setSizeReadByFirstRead("1024.000");
-    parquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("4096.000");
-    parquetMetrics.setAvgFileLength(Double.parseDouble("8388608.000"));
-    parquetMetrics.setAvgReadLenRequested(0.000);
+    parquetMetrics.addMeanMetricValue(PARQUET, AVG_FILE_LENGTH, Long.parseLong("8388608"));
+    parquetMetrics.addMeanMetricValue(PARQUET, AVG_READ_LEN_REQUESTED, Long.parseLong("2560"));
+    parquetMetrics.addMeanMetricValue(PARQUET, AVG_SIZE_READ_BY_FIRST_READ, Long.parseLong("1024"));
+    parquetMetrics.addMeanMetricValue(PARQUET, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ, Long.parseLong("4096"));
+    parquetMetrics.incrementMetricValue(PARQUET, TOTAL_FILES);
     return parquetMetrics;
     return parquetMetrics;
   }
   }
 
 
@@ -326,8 +337,8 @@ public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
     AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
     AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
 
 
     // Concatenate and assert the metrics equality.
     // Concatenate and assert the metrics equality.
-    String metrics = parquetMetrics.getReadFooterMetrics(parquetMetrics);
-    metrics += nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
+    String metrics = parquetMetrics.toString();
+    metrics += nonParquetMetrics.toString();
     assertMetricsEquality(fs, metrics);
     assertMetricsEquality(fs, metrics);
 
 
     // Close the AzureBlobFileSystem instance.
     // Close the AzureBlobFileSystem instance.
@@ -394,7 +405,7 @@ public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
 
 
       // Get and assert the footer metrics for non-Parquet scenarios.
       // Get and assert the footer metrics for non-Parquet scenarios.
       AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
       AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
-      String metrics = nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
+      String metrics = nonParquetMetrics.toString();
       assertMetricsEquality(fs, metrics);
       assertMetricsEquality(fs, metrics);
 
 
       // Introduce an additional idle period by sleeping.
       // Introduce an additional idle period by sleeping.

+ 115 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsBackoffMetrics.java

@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_SUCCEEDED;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.enums.RetryValue.ONE;
+import static org.apache.hadoop.fs.azurebfs.enums.RetryValue.THREE;
+import static org.apache.hadoop.fs.azurebfs.enums.RetryValue.TWO;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_GAUGE;
+
+public class TestAbfsBackoffMetrics {
+    private AbfsBackoffMetrics metrics;
+    private static final int TOTAL_COUNTERS = 22;
+    private static final int TOTAL_GAUGES = 21;
+
+    /**
+     * Sets up the test environment by initializing the AbfsBackoffMetrics instance.
+     */
+    @Before
+    public void setUp() {
+        metrics = new AbfsBackoffMetrics();
+    }
+
+    /**
+     * Tests the retrieval of metric names based on the statistic type.
+     */
+    @Test
+    public void retrievesMetricNamesBasedOnStatisticType() {
+        String[] counterMetrics = metrics.getMetricNamesByType(TYPE_COUNTER);
+        String[] gaugeMetrics = metrics.getMetricNamesByType(TYPE_GAUGE);
+        Assertions.assertThat(counterMetrics.length)
+                .describedAs("Counter metrics should have 22 elements")
+                .isEqualTo(TOTAL_COUNTERS);
+        Assertions.assertThat(gaugeMetrics.length)
+                .describedAs("Gauge metrics should have 21 elements")
+                .isEqualTo(TOTAL_GAUGES);
+    }
+
+    /**
+     * Tests the retrieval of the value of a specific metric.
+     */
+    @Test
+    public void retrievesValueOfSpecificMetric() {
+        metrics.setMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, 5, ONE);
+        Assertions.assertThat(metrics.getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, ONE))
+                .describedAs("Number of request succeeded for retry 1 should be 5")
+                .isEqualTo(5);
+        Assertions.assertThat(metrics.getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, TWO))
+                .describedAs("Number of request succeeded for other retries except 1 should be 0")
+                .isEqualTo(0);
+    }
+
+    /**
+     * Tests the increment of the value of a specific metric.
+     */
+    @Test
+    public void incrementsValueOfSpecificMetric() {
+        metrics.incrementMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, ONE);
+        Assertions.assertThat(metrics.getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, ONE))
+                .describedAs("Number of request succeeded for retry 1 should be 1")
+                .isEqualTo(1);
+        Assertions.assertThat(metrics.getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, THREE))
+                .describedAs("Number of request succeeded for other retries except 1 should be 0")
+                .isEqualTo(0);
+    }
+
+    /**
+     * Tests the string representation of empty backoff metrics.
+     */
+    @Test
+    public void returnsStringRepresentationOfEmptyBackoffMetrics() {
+        Assertions.assertThat(metrics.getMetricValue(TOTAL_NUMBER_OF_REQUESTS))
+                .describedAs("String representation of backoff metrics should be empty")
+                .isEqualTo(0);
+        Assertions.assertThat(metrics.toString())
+                .describedAs("String representation of backoff metrics should be empty")
+                .isEmpty();
+    }
+
+    /**
+     * Tests the string representation of backoff metrics.
+     */
+    @Test
+    public void returnsStringRepresentationOfBackoffMetrics() {
+        metrics.incrementMetricValue(TOTAL_NUMBER_OF_REQUESTS);
+        Assertions.assertThat(metrics.getMetricValue(TOTAL_NUMBER_OF_REQUESTS))
+                .describedAs("String representation of backoff metrics should not be empty")
+                .isEqualTo(1);
+        Assertions.assertThat(metrics.toString())
+                .describedAs("String representation of backoff metrics should not be empty")
+                .contains("$TR=1");
+    }
+}

+ 98 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsReadFooterMetrics.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.Before;
+
+/**
+ * Unit test for Abfs read footer metrics
+ */
+public class TestAbfsReadFooterMetrics {
+    private static final long CONTENT_LENGTH = 50000;
+    private static final int LENGTH = 10000;
+    private static final int NEXT_READ_POS = 30000;
+    private static final String TEST_FILE1 = "TestFile";
+    private static final String TEST_FILE2 = "TestFile2";
+    private AbfsReadFooterMetrics metrics;
+
+    @Before
+    public void setUp() {
+        metrics = new AbfsReadFooterMetrics();
+    }
+
+    /**
+     * Tests that metrics are updated correctly for the first read of a file.
+     */
+    @Test
+    public void metricsUpdateForFirstRead() {
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS);
+        Assertions.assertThat(metrics.getTotalFiles())
+                .describedAs("Total number of files")
+                .isEqualTo(0);
+    }
+
+    /**
+     * Tests that metrics are updated correctly for the second read of the same file.
+     */
+    @Test
+    public void metricsUpdateForSecondRead() {
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS);
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS+LENGTH);
+        Assertions.assertThat(metrics.getTotalFiles())
+                .describedAs("Total number of files")
+                .isEqualTo(1);
+    }
+
+    /**
+     * Tests that metrics are updated correctly for multiple reads in one files.
+     */
+    @Test
+    public void metricsUpdateForOneFile() {
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS);
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS+LENGTH);
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS+2*LENGTH);
+        Assertions.assertThat(metrics.getTotalFiles())
+                .describedAs("Total number of files")
+                .isEqualTo(1);
+        Assertions.assertThat(metrics.toString())
+                .describedAs("Metrics after reading 3 reads of the same file")
+                .isEqualTo("$NON_PARQUET:$FR=10000.000_20000.000$SR=10000.000_10000.000$FL=50000.000$RL=10000.000");
+    }
+
+    /**
+     * Tests that the getReadFooterMetrics method returns the correct metrics after multiple reads on different files.
+     */
+    @Test
+    public void metricsUpdateForMultipleFiles() {
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS);
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS+LENGTH);
+        metrics.updateReadMetrics(TEST_FILE1, LENGTH, CONTENT_LENGTH, NEXT_READ_POS+2*LENGTH);
+        metrics.updateReadMetrics(TEST_FILE2, LENGTH, CONTENT_LENGTH/2, NEXT_READ_POS);
+        metrics.updateReadMetrics(TEST_FILE2, LENGTH, CONTENT_LENGTH/2, NEXT_READ_POS+LENGTH);
+        metrics.updateReadMetrics(TEST_FILE2, LENGTH, CONTENT_LENGTH/2, NEXT_READ_POS+2*LENGTH);
+        Assertions.assertThat(metrics.getTotalFiles())
+                .describedAs("Total number of files")
+                .isEqualTo(2);
+        Assertions.assertThat(metrics.toString())
+                .describedAs("Metrics after reading 3 reads of the same file")
+                .isEqualTo("$NON_PARQUET:$FR=10000.000_12500.000$SR=10000.000_10000.000$FL=37500.000$RL=10000.000");
+    }
+}

+ 3 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java

@@ -24,11 +24,12 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.junit.Test;
 import org.junit.Test;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_FAILED;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_KEY;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_KEY;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_URI;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_URI;
-import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -83,7 +84,7 @@ public class TestAbfsRestOperation extends
 
 
     // For retry count greater than the max configured value, the request should fail.
     // For retry count greater than the max configured value, the request should fail.
     Assert.assertEquals("Number of failed requests does not match expected value.",
     Assert.assertEquals("Number of failed requests does not match expected value.",
-            "3", String.valueOf(testClient.getAbfsCounters().getAbfsBackoffMetrics().getNumberOfRequestsFailed()));
+            "3", String.valueOf(testClient.getAbfsCounters().getAbfsBackoffMetrics().getMetricValue(NUMBER_OF_REQUESTS_FAILED)));
 
 
     // Close the AzureBlobFileSystem.
     // Close the AzureBlobFileSystem.
     fs.close();
     fs.close();

+ 24 - 0
hadoop-tools/hadoop-azure/src/test/resources/accountSettings/accountName_settings.xml.template

@@ -50,6 +50,12 @@
 
 
     14. READER_RBAC_USER_CLIENT_ID -> readerRBACUser Service principal's client ID
     14. READER_RBAC_USER_CLIENT_ID -> readerRBACUser Service principal's client ID
     15. READER_RBAC_USER_CLIENT_SECRET -> readerRBACUser Service principal's client secret
     15. READER_RBAC_USER_CLIENT_SECRET -> readerRBACUser Service principal's client secret
+
+  ## METRIC SETTINGS ##
+    16. METRIC_ACCOUNT_NAME -> to metric account name without domain
+    17. METRIC_ACCOUNT_KEY -> Metric account access key
+    18. METRIC_CONTAINER -> name of an metric container
+    19. METRIC_FORMAT -> format of the metric (INTERNAL_BACKOFF_METRIC_FORMAT, INTERNAL_FOOTER_METRIC_FORMAT, INTERNAL_METRIC_FORMAT)
 -->
 -->
 
 
 <configuration>
 <configuration>
@@ -187,4 +193,22 @@
     <name>fs.azure.account.oauth2.reader.client.secret</name>
     <name>fs.azure.account.oauth2.reader.client.secret</name>
     <value>READER_RBAC_USER_CLIENT_ID</value>
     <value>READER_RBAC_USER_CLIENT_ID</value>
   </property>
   </property>
+
+  <!-- METRIC SETTINGS -->
+  <property>
+    <name>fs.azure.metric.account.name</name>
+    <value>METRIC_ACCOUNT_NAME.dfs.core.windows.net</value>
+  </property>
+  <property>
+    <name>fs.azure.metric.account.key</name>
+    <value>METRIC_ACCOUNT_KEY</value>
+  </property>
+  <property>
+    <name>fs.azure.metric.uri</name>
+    <value>https://METRIC_ACCOUNT_NAME.dfs.core.windows.net/METRIC_CONTAINER</value>
+  </property>
+  <property>
+    <name>fs.azure.metric.format</name>
+    <value>METRIC_FORMAT</value>
+  </property>
 </configuration>
 </configuration>