Browse Source

HDFS-16488. [SPS]: Expose metrics to JMX for external SPS (#4035)

litao 3 years ago
parent
commit
acc0e0a210

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java

@@ -301,6 +301,16 @@ public class BlockStorageMovementAttemptedItems {
     }
   }
 
+  @VisibleForTesting
+  public List<AttemptedItemInfo> getStorageMovementAttemptedItems() {
+    return storageMovementAttemptedItems;
+  }
+
+  @VisibleForTesting
+  public BlockingQueue<Block> getMovementFinishedBlocks() {
+    return movementFinishedBlocks;
+  }
+
   public void clearQueues() {
     movementFinishedBlocks.clear();
     synchronized (storageMovementAttemptedItems) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java

@@ -1077,7 +1077,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
    * attempted or reported time stamp. This is used by
    * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
    */
-  final static class AttemptedItemInfo extends ItemInfo {
+  public final static class AttemptedItemInfo extends ItemInfo {
     private long lastAttemptedOrReportedTime;
     private final Set<Block> blocks;
 
@@ -1095,7 +1095,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
      * @param retryCount
      *          file retry count
      */
-    AttemptedItemInfo(long rootId, long trackId,
+    public AttemptedItemInfo(long rootId, long trackId,
         long lastAttemptedOrReportedTime,
         Set<Block> blocks, int retryCount) {
       super(rootId, trackId, retryCount);

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java

@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -39,10 +40,12 @@ import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.Context;
 import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
 import org.apache.hadoop.net.NetworkTopology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +65,7 @@ public class ExternalSPSContext implements Context {
   private final FileCollector fileCollector;
   private final BlockMoveTaskHandler externalHandler;
   private final BlockMovementListener blkMovementListener;
+  private ExternalSPSBeanMetrics spsBeanMetrics;
 
   public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
     this.service = service;
@@ -208,4 +212,17 @@ public class ExternalSPSContext implements Context {
       LOG.info("Movement attempted blocks", actualBlockMovements);
     }
   }
+
+  public void initMetrics(StoragePolicySatisfier sps) {
+    spsBeanMetrics = new ExternalSPSBeanMetrics(sps);
+  }
+
+  public void closeMetrics() {
+    spsBeanMetrics.close();
+  }
+
+  @VisibleForTesting
+  public ExternalSPSBeanMetrics getSpsBeanMetrics() {
+    return spsBeanMetrics;
+  }
 }

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java

@@ -48,8 +48,7 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 public final class ExternalStoragePolicySatisfier {
-  public static final Logger LOG = LoggerFactory
-      .getLogger(ExternalStoragePolicySatisfier.class);
+  public static final Logger LOG = LoggerFactory.getLogger(ExternalStoragePolicySatisfier.class);
 
   private ExternalStoragePolicySatisfier() {
     // This is just a class to start and run external sps.
@@ -60,6 +59,7 @@ public final class ExternalStoragePolicySatisfier {
    */
   public static void main(String[] args) throws Exception {
     NameNodeConnector nnc = null;
+    ExternalSPSContext context = null;
     try {
       StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
           LOG);
@@ -69,9 +69,10 @@ public final class ExternalStoragePolicySatisfier {
       StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
       nnc = getNameNodeConnector(spsConf);
 
-      ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
+      context = new ExternalSPSContext(sps, nnc);
       sps.init(context);
       sps.start(StoragePolicySatisfierMode.EXTERNAL);
+      context.initMetrics(sps);
       if (sps != null) {
         sps.join();
       }
@@ -82,6 +83,11 @@ public final class ExternalStoragePolicySatisfier {
       if (nnc != null) {
         nnc.close();
       }
+      if (context!= null) {
+        if (context.getSpsBeanMetrics() != null) {
+          context.closeMetrics();
+        }
+      }
     }
   }
 

+ 100 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/metrics/ExternalSPSBeanMetrics.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.sps.metrics;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import java.util.HashSet;
+
+/**
+ * Expose the ExternalSPS metrics.
+ */
+public class ExternalSPSBeanMetrics implements ExternalSPSMXBean {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ExternalSPSBeanMetrics.class);
+
+  /**
+   * ExternalSPS bean.
+   */
+  private ObjectName externalSPSBeanName;
+  private StoragePolicySatisfier storagePolicySatisfier;
+
+  public ExternalSPSBeanMetrics(StoragePolicySatisfier sps) {
+    try {
+      this.storagePolicySatisfier = sps;
+      StandardMBean bean = new StandardMBean(this, ExternalSPSMXBean.class);
+      this.externalSPSBeanName = MBeans.register("ExternalSPS", "ExternalSPS", bean);
+      LOG.info("Registered ExternalSPS MBean: {}", this.externalSPSBeanName);
+    } catch (NotCompliantMBeanException e) {
+      throw new RuntimeException("Bad externalSPS MBean setup", e);
+    }
+  }
+
+  /**
+   * Unregister the JMX interfaces.
+   */
+  public void close() {
+    if (externalSPSBeanName != null) {
+      MBeans.unregister(externalSPSBeanName);
+      externalSPSBeanName = null;
+    }
+  }
+
+  @Override
+  public int getProcessingQueueSize() {
+    return storagePolicySatisfier.processingQueueSize();
+  }
+
+  @VisibleForTesting
+  public void updateProcessingQueueSize() {
+    storagePolicySatisfier.getStorageMovementQueue()
+        .add(new ItemInfo(0, 1, 1));
+  }
+
+  @Override
+  public int getMovementFinishedBlocksCount() {
+    return storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocksCount();
+  }
+
+  @VisibleForTesting
+  public void updateMovementFinishedBlocksCount() {
+    storagePolicySatisfier.getAttemptedItemsMonitor().getMovementFinishedBlocks()
+        .add(new Block(1));
+  }
+
+  @Override
+  public int getAttemptedItemsCount() {
+    return storagePolicySatisfier.getAttemptedItemsMonitor().getAttemptedItemsCount();
+  }
+
+  @VisibleForTesting
+  public void updateAttemptedItemsCount() {
+    storagePolicySatisfier.getAttemptedItemsMonitor().getStorageMovementAttemptedItems()
+        .add(new StoragePolicySatisfier.AttemptedItemInfo(0, 1, 1, new HashSet<>(), 1));
+  }
+}

+ 52 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/metrics/ExternalSPSMXBean.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.sps.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is the JMX management interface for ExternalSPS information.
+ * End users shouldn't be implementing these interfaces, and instead
+ * access this information through the JMX APIs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public interface ExternalSPSMXBean {
+
+  /**
+   * Gets the queue size of StorageMovementNeeded.
+   *
+   * @return the queue size of StorageMovementNeeded.
+   */
+  int getProcessingQueueSize();
+
+  /**
+   * Gets the count of movement finished blocks.
+   *
+   * @return the count of movement finished blocks.
+   */
+  int getMovementFinishedBlocksCount();
+
+  /**
+   * Gets the count of attempted items.
+   *
+   * @return the count of attempted items.
+   */
+  int getAttemptedItemsCount();
+}

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/metrics/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides the ability to expose external SPS metrics to JMX.
+ */
+package org.apache.hadoop.hdfs.server.sps.metrics;

+ 34 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java

@@ -44,6 +44,7 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -84,6 +85,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
 import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.sps.metrics.ExternalSPSBeanMetrics;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.SecurityUtil;
@@ -102,6 +104,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import java.util.function.Supplier;
 
 /**
@@ -1817,4 +1821,34 @@ public class TestExternalStoragePolicySatisfier {
       actualBlockMovements.clear();
     }
   }
+
+  @Test(timeout = 300000)
+  public void testExternalSPSMetrics() throws Exception {
+    try {
+      createCluster();
+      // Start JMX but stop SPS thread to prevent mock data from being consumed.
+      externalSps.stop(true);
+      externalCtxt.initMetrics(externalSps);
+
+      ExternalSPSBeanMetrics spsBeanMetrics = externalCtxt.getSpsBeanMetrics();
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      ObjectName mxBeanName = new ObjectName("Hadoop:service=ExternalSPS,name=ExternalSPS");
+      // Assert metrics before update.
+      assertEquals(0, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
+      assertEquals(0, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
+      assertEquals(0, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
+
+      // Update metrics.
+      spsBeanMetrics.updateAttemptedItemsCount();
+      spsBeanMetrics.updateProcessingQueueSize();
+      spsBeanMetrics.updateMovementFinishedBlocksCount();
+
+      // Assert metrics after update.
+      assertEquals(1, mbs.getAttribute(mxBeanName, "AttemptedItemsCount"));
+      assertEquals(1, mbs.getAttribute(mxBeanName, "ProcessingQueueSize"));
+      assertEquals(1, mbs.getAttribute(mxBeanName, "MovementFinishedBlocksCount"));
+    } finally {
+      shutdownCluster();
+    }
+  }
 }