Explorar o código

HDFS-16406. ReadsFromLocalClient counts short-circuit reads (#3847)

secfree %!s(int64=3) %!d(string=hai) anos
pai
achega
bf0cefb0d8

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -415,6 +415,9 @@ class DataXceiver extends Receiver implements Runnable {
                     "Not verifying {}", slotId);
         }
         success = true;
+        // update metrics
+        datanode.metrics.addReadBlockOp(elapsed());
+        datanode.metrics.incrReadsFromClient(true, blk.getNumBytes());
       }
     } finally {
       if ((!success) && (registeredSlotId != null)) {

+ 38 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java

@@ -25,6 +25,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.*;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -38,7 +39,10 @@ import java.util.function.Supplier;
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.util.Lists;
+import org.junit.Assume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -654,4 +658,38 @@ public class TestDataNodeMetrics {
     assertCounter("HeartbeatsForns1-nn1NumOps", 1L, rb);
     assertCounter("HeartbeatsNumOps", 4L, rb);
   }
+
+  @Test
+  public void testNodeLocalMetrics() throws Exception {
+    Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason());
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+            "testNodeLocalMetrics._PORT.sock").getAbsolutePath());
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      Path testFile = new Path("/testNodeLocalMetrics.txt");
+      DFSTestUtil.createFile(fs, testFile, 10L, (short)1, 1L);
+      DFSTestUtil.readFile(fs, testFile);
+      List<DataNode> datanodes = cluster.getDataNodes();
+      assertEquals(1, datanodes.size());
+
+      DataNode datanode = datanodes.get(0);
+      MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
+
+      // Write related metrics
+      assertCounter("WritesFromLocalClient", 1L, rb);
+      // Read related metrics
+      assertCounter("ReadsFromLocalClient", 1L, rb);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }