소스 검색

HDFS-17262. Fixed the verbose log.warn in DFSUtil.addTransferRateMetric(). (#6290). Contributed by Xing Lin.

Reviewed-by: Ravindra Dingankar <rdingankar@linkedin.com>
Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com>
Reviewed-by: Tao Li <tomscut@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Xing Lin 1 년 전
부모
커밋
607c981042

+ 29 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -60,6 +60,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Option;
@@ -1970,16 +1971,36 @@ public class DFSUtil {
   }
 
   /**
-   * Add transfer rate metrics for valid data read and duration values.
+   * Add transfer rate metrics in bytes per second.
    * @param metrics metrics for datanodes
    * @param read bytes read
-   * @param duration read duration
+   * @param durationInNS read duration in nanoseconds
    */
-  public static void addTransferRateMetric(final DataNodeMetrics metrics, final long read, final long duration) {
-    if (read >= 0 && duration > 0) {
-        metrics.addReadTransferRate(read * 1000 / duration);
-    } else {
-      LOG.warn("Unexpected value for data transfer bytes={} duration={}", read, duration);
-    }
+  public static void addTransferRateMetric(final DataNodeMetrics metrics, final long read,
+      final long durationInNS) {
+    metrics.addReadTransferRate(getTransferRateInBytesPerSecond(read, durationInNS));
+  }
+
+  /**
+   * Calculate the transfer rate in bytes per second.
+   *
+   * We have the read duration in nanoseconds for precision for transfers taking a few nanoseconds.
+   * We treat shorter durations below 1 ns as 1 ns as we also want to capture reads taking less
+   * than a nanosecond. To calculate transferRate in bytes per second, we avoid multiplying bytes
+   * read by 10^9 to avoid overflow. Instead, we first calculate the duration in seconds in double
+   * to keep the decimal values for smaller durations. We then divide bytes read by
+   * durationInSeconds to get the transferRate in bytes per second.
+   *
+   * We also replace a negative value for transferred bytes with 0 byte.
+   *
+   * @param bytes bytes read
+   * @param durationInNS read duration in nanoseconds
+   * @return bytes per second
+   */
+  public static long getTransferRateInBytesPerSecond(long bytes, long durationInNS) {
+    bytes = Math.max(bytes, 0);
+    durationInNS = Math.max(durationInNS, 1);
+    double durationInSeconds = (double) durationInNS / TimeUnit.SECONDS.toNanos(1);
+    return (long) (bytes / durationInSeconds);
   }
 }

+ 8 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -607,10 +607,10 @@ class DataXceiver extends Receiver implements Runnable {
       // send op status
       writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
 
-      long beginRead = Time.monotonicNow();
+      long beginReadInNS = Time.monotonicNowNanos();
       // send data
       read = blockSender.sendBlock(out, baseStream, dataXceiverServer.getReadThrottler());
-      long duration = Time.monotonicNow() - beginRead;
+      long durationInNS = Time.monotonicNowNanos() - beginReadInNS;
       if (blockSender.didSendEntireByteRange()) {
         // If we sent the entire range, then we should expect the client
         // to respond with a Status enum.
@@ -633,8 +633,8 @@ class DataXceiver extends Receiver implements Runnable {
       }
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
-      datanode.metrics.incrTotalReadTime(duration);
-      DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);
+      datanode.metrics.incrTotalReadTime(TimeUnit.NANOSECONDS.toMillis(durationInNS));
+      DFSUtil.addTransferRateMetric(datanode.metrics, read, durationInNS);
     } catch ( SocketException ignored ) {
       LOG.trace("{}:Ignoring exception while serving {} to {}",
           dnR, block, remoteAddress, ignored);
@@ -1117,15 +1117,15 @@ class DataXceiver extends Receiver implements Runnable {
       // send status first
       writeSuccessWithChecksumInfo(blockSender, reply);
 
-      long beginRead = Time.monotonicNow();
+      long beginReadInNS = Time.monotonicNowNanos();
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream,
                                         dataXceiverServer.balanceThrottler);
-      long duration = Time.monotonicNow() - beginRead;
+      long durationInNS = Time.monotonicNowNanos() - beginReadInNS;
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
-      datanode.metrics.incrTotalReadTime(duration);
-      DFSUtil.addTransferRateMetric(datanode.metrics, read, duration);
+      datanode.metrics.incrTotalReadTime(TimeUnit.NANOSECONDS.toMillis(durationInNS));
+      DFSUtil.addTransferRateMetric(datanode.metrics, read, durationInNS);
       
       LOG.info("Copied {} to {}", block, peer.getRemoteAddressString());
     } catch (IOException ioe) {

+ 27 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java

@@ -1127,14 +1127,36 @@ public class TestDFSUtil {
   @Test
   public void testAddTransferRateMetricForValidValues() {
     DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
-    DFSUtil.addTransferRateMetric(mockMetrics, 100, 10);
-    verify(mockMetrics).addReadTransferRate(10000);
+    DFSUtil.addTransferRateMetric(mockMetrics, 3_251_854_872L, 129_593_000_000L);
+    verify(mockMetrics).addReadTransferRate(250_92_828L);
   }
 
   @Test
-  public void testAddTransferRateMetricForInvalidValue() {
+  public void testAddTransferRateMetricForZeroNSTransferDuration() {
     DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
-    DFSUtil.addTransferRateMetric(mockMetrics, 100, 0);
-    verify(mockMetrics, times(0)).addReadTransferRate(anyLong());
+    DFSUtil.addTransferRateMetric(mockMetrics, 1L, 0);
+    verify(mockMetrics).addReadTransferRate(999_999_999L);
+  }
+
+  @Test
+  public void testAddTransferRateMetricNegativeTransferBytes() {
+    DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
+    DFSUtil.addTransferRateMetric(mockMetrics, -1L, 0);
+    verify(mockMetrics).addReadTransferRate(0L);
+  }
+
+  @Test
+  public void testAddTransferRateMetricZeroTransferBytes() {
+    DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
+    DFSUtil.addTransferRateMetric(mockMetrics, -1L, 0);
+    verify(mockMetrics).addReadTransferRate(0L);
+  }
+
+  @Test
+  public void testGetTransferRateInBytesPerSecond() {
+    assertEquals(999_999_999, DFSUtil.getTransferRateInBytesPerSecond(1L, 1L));
+    assertEquals(999_999_999, DFSUtil.getTransferRateInBytesPerSecond(1L, 0L));
+    assertEquals(102_400_000,
+        DFSUtil.getTransferRateInBytesPerSecond(512_000_000L, 5_000_000_000L));
   }
 }