Просмотр исходного кода

HDFS-11534. Add counters for number of blocks in pending IBR. Contributed by Xiaobing Zhou.

Xiaobing Zhou 8 лет назад
Родитель
Сommit
757d9ebcc5

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -125,7 +125,9 @@ class BPServiceActor implements Runnable {
     this.initialRegistrationComplete = lifelineNnAddr != null ?
         new CountDownLatch(1) : null;
     this.dnConf = dn.getDnConf();
-    this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
+    this.ibrManager = new IncrementalBlockReportManager(
+        dnConf.ibrInterval,
+        dn.getMetrics());
     prevBlockReportId = ThreadLocalRandom.current().nextLong();
     scheduler = new Scheduler(dnConf.heartBeatInterval,
         dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
@@ -349,7 +351,7 @@ class BPServiceActor implements Runnable {
     // or we will report an RBW replica after the BlockReport already reports
     // a FINALIZED one.
     ibrManager.sendIBRs(bpNamenode, bpRegistration,
-        bpos.getBlockPoolId(), dn.getMetrics());
+        bpos.getBlockPoolId());
 
     long brCreateStartTime = monotonicNow();
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@@ -672,7 +674,7 @@ class BPServiceActor implements Runnable {
         }
         if (ibrManager.sendImmediately() || sendHeartbeat) {
           ibrManager.sendIBRs(bpNamenode, bpRegistration,
-              bpos.getBlockPoolId(), dn.getMetrics());
+              bpos.getBlockPoolId());
         }
 
         List<DatanodeCommand> cmds = null;

+ 37 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java

@@ -52,6 +52,11 @@ class IncrementalBlockReportManager {
     /** The blocks in this IBR. */
     final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
 
+    private DataNodeMetrics dnMetrics;
+    PerStorageIBR(final DataNodeMetrics dnMetrics) {
+      this.dnMetrics = dnMetrics;
+    }
+
     /**
      * Remove the given block from this IBR
      * @return true if the block was removed; otherwise, return false.
@@ -76,6 +81,25 @@ class IncrementalBlockReportManager {
     /** Put the block to this IBR. */
     void put(ReceivedDeletedBlockInfo rdbi) {
       blocks.put(rdbi.getBlock(), rdbi);
+      increaseBlocksCounter(rdbi);
+    }
+
+    private void increaseBlocksCounter(
+        final ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
+      switch (receivedDeletedBlockInfo.getStatus()) {
+      case RECEIVING_BLOCK:
+        dnMetrics.incrBlocksReceivingInPendingIBR();
+        break;
+      case RECEIVED_BLOCK:
+        dnMetrics.incrBlocksReceivedInPendingIBR();
+        break;
+      case DELETED_BLOCK:
+        dnMetrics.incrBlocksDeletedInPendingIBR();
+        break;
+      default:
+        break;
+      }
+      dnMetrics.incrBlocksInPendingIBR();
     }
 
     /**
@@ -114,10 +138,14 @@ class IncrementalBlockReportManager {
 
   /** The timestamp of the last IBR. */
   private volatile long lastIBR;
+  private DataNodeMetrics dnMetrics;
 
-  IncrementalBlockReportManager(final long ibrInterval) {
+  IncrementalBlockReportManager(
+      final long ibrInterval,
+      final DataNodeMetrics dnMetrics) {
     this.ibrInterval = ibrInterval;
     this.lastIBR = monotonicNow() - ibrInterval;
+    this.dnMetrics = dnMetrics;
   }
 
   boolean sendImmediately() {
@@ -147,6 +175,10 @@ class IncrementalBlockReportManager {
         reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
       }
     }
+
+    /* set blocks to zero */
+    this.dnMetrics.resetBlocksInPendingIBR();
+
     readyToSend = false;
     return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]);
   }
@@ -162,7 +194,7 @@ class IncrementalBlockReportManager {
 
   /** Send IBRs to namenode. */
   void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
-      String bpid, DataNodeMetrics metrics) throws IOException {
+      String bpid) throws IOException {
     // Generate a list of the pending reports for each storage under the lock
     final StorageReceivedDeletedBlocks[] reports = generateIBRs();
     if (reports.length == 0) {
@@ -180,8 +212,9 @@ class IncrementalBlockReportManager {
       namenode.blockReceivedAndDeleted(registration, bpid, reports);
       success = true;
     } finally {
-      metrics.addIncrementalBlockReport(monotonicNow() - startTime);
+
       if (success) {
+        dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
         lastIBR = startTime;
       } else {
         // If we didn't succeed in sending the report, put all of the
@@ -199,7 +232,7 @@ class IncrementalBlockReportManager {
       // This is the first time we are adding incremental BR state for
       // this storage so create a new map. This is required once per
       // storage, per service actor.
-      perStorage = new PerStorageIBR();
+      perStorage = new PerStorageIBR(dnMetrics);
       pendingIBRs.put(storage, perStorage);
     }
     return perStorage;

+ 36 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
 import java.util.concurrent.ThreadLocalRandom;
@@ -126,6 +127,15 @@ public class DataNodeMetrics {
   @Metric MutableRate sendDataPacketTransferNanos;
   final MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
 
+  @Metric("Count of blocks in pending IBR")
+  private MutableGaugeLong blocksInPendingIBR;
+  @Metric("Count of blocks at receiving status in pending IBR")
+  private MutableGaugeLong blocksReceivingInPendingIBR;
+  @Metric("Count of blocks at received status in pending IBR")
+  private MutableGaugeLong blocksReceivedInPendingIBR;
+  @Metric("Count of blocks at deleted status in pending IBR")
+  private MutableGaugeLong blocksDeletedInPendingIBR;
+
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
   JvmMetrics jvmMetrics = null;
@@ -415,4 +425,30 @@ public class DataNodeMetrics {
       q.add(latencyMs);
     }
   }
+
+  /**
+   * Resets blocks in pending IBR to zero.
+   */
+  public void resetBlocksInPendingIBR() {
+    blocksInPendingIBR.set(0);
+    blocksReceivingInPendingIBR.set(0);
+    blocksReceivedInPendingIBR.set(0);
+    blocksDeletedInPendingIBR.set(0);
+  }
+
+  public void incrBlocksInPendingIBR() {
+    blocksInPendingIBR.incr();
+  }
+
+  public void incrBlocksReceivingInPendingIBR() {
+    blocksReceivingInPendingIBR.incr();
+  }
+
+  public void incrBlocksReceivedInPendingIBR() {
+    blocksReceivedInPendingIBR.incr();
+  }
+
+  public void incrBlocksDeletedInPendingIBR() {
+    blocksDeletedInPendingIBR.incr();
+  }
 }

+ 146 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockCountersInPendingIBR.java

@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.timeout;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test counters for number of blocks in pending IBR.
+ */
+public class TestBlockCountersInPendingIBR {
+
+  @Test
+  public void testBlockCounters() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+
+    /*
+     * Set a really long value for dfs.blockreport.intervalMsec and
+     * dfs.heartbeat.interval, so that incremental block reports and heartbeats
+     * won't be sent during this test unless they're triggered manually.
+     */
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);
+
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    final DatanodeProtocolClientSideTranslatorPB spy =
+        InternalDataNodeTestUtils.spyOnBposToNN(
+            cluster.getDataNodes().get(0), cluster.getNameNode());
+    final DataNode datanode = cluster.getDataNodes().get(0);
+
+    /* We should get 0 incremental block report. */
+    Mockito.verify(spy, timeout(60000).times(0)).blockReceivedAndDeleted(
+        any(DatanodeRegistration.class),
+        anyString(),
+        any(StorageReceivedDeletedBlocks[].class));
+
+    /*
+     * Create fake blocks notification on the DataNode. This will be sent with
+     * the next incremental block report.
+     */
+    final BPServiceActor actor =
+        datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
+    final FsDatasetSpi<?> dataset = datanode.getFSDataset();
+    final DatanodeStorage storage;
+    try (FsDatasetSpi.FsVolumeReferences volumes =
+        dataset.getFsVolumeReferences()) {
+      storage = dataset.getStorage(volumes.get(0).getStorageID());
+    }
+
+    ReceivedDeletedBlockInfo rdbi = null;
+    /* block at status of RECEIVING_BLOCK */
+    rdbi = new ReceivedDeletedBlockInfo(
+        new Block(5678, 512, 1000),  BlockStatus.RECEIVING_BLOCK, null);
+    actor.getIbrManager().addRDBI(rdbi, storage);
+
+    /* block at status of RECEIVED_BLOCK */
+    rdbi = new ReceivedDeletedBlockInfo(
+        new Block(5679, 512, 1000),  BlockStatus.RECEIVED_BLOCK, null);
+    actor.getIbrManager().addRDBI(rdbi, storage);
+
+    /* block at status of DELETED_BLOCK */
+    rdbi = new ReceivedDeletedBlockInfo(
+        new Block(5680, 512, 1000),  BlockStatus.DELETED_BLOCK, null);
+    actor.getIbrManager().addRDBI(rdbi, storage);
+
+    /* verify counters before sending IBR */
+    verifyBlockCounters(datanode, 3, 1, 1, 1);
+
+    /* Manually trigger a block report. */
+    datanode.triggerBlockReport(
+        new BlockReportOptions.Factory().
+            setIncremental(true).
+            build()
+    );
+
+    /*
+     * triggerBlockReport returns before the block report is actually sent. Wait
+     * for it to be sent here.
+     */
+    Mockito.verify(spy, timeout(60000).times(1)).
+        blockReceivedAndDeleted(
+            any(DatanodeRegistration.class),
+            anyString(),
+            any(StorageReceivedDeletedBlocks[].class));
+
+    /* verify counters after sending IBR */
+    verifyBlockCounters(datanode, 0, 0, 0, 0);
+
+    cluster.shutdown();
+  }
+
+
+  private void verifyBlockCounters(final DataNode datanode,
+      final long blocksInPendingIBR, final long blocksReceivingInPendingIBR,
+      final long blocksReceivedInPendingIBR,
+      final long blocksDeletedInPendingIBR) {
+
+    final MetricsRecordBuilder m = MetricsAsserts
+        .getMetrics(datanode.getMetrics().name());
+
+    MetricsAsserts.assertGauge("BlocksInPendingIBR",
+        blocksInPendingIBR, m);
+    MetricsAsserts.assertGauge("BlocksReceivingInPendingIBR",
+        blocksReceivingInPendingIBR, m);
+    MetricsAsserts.assertGauge("BlocksReceivedInPendingIBR",
+        blocksReceivedInPendingIBR, m);
+    MetricsAsserts.assertGauge("BlocksDeletedInPendingIBR",
+        blocksDeletedInPendingIBR, m);
+  }
+}