Bläddra i källkod

HADOOP-13285. DecayRpcScheduler MXBean should only report decayed CallVolumeSummary. Contributed by Xiaoyu Yao.

(cherry picked from commit 0761379fe45898c44c8f161834c298ef932e4d8c)
Xiaoyu Yao 9 år sedan
förälder
incheckning
ed5d3a62ce

+ 16 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -901,9 +901,24 @@ public class DecayRpcScheduler implements RpcScheduler,
   public String getCallVolumeSummary() {
   public String getCallVolumeSummary() {
     try {
     try {
       ObjectMapper om = new ObjectMapper();
       ObjectMapper om = new ObjectMapper();
-      return om.writeValueAsString(callCounts);
+      return om.writeValueAsString(getDecayedCallCounts());
     } catch (Exception e) {
     } catch (Exception e) {
       return "Error: " + e.getMessage();
       return "Error: " + e.getMessage();
     }
     }
   }
   }
+
+  private Map<Object, Long> getDecayedCallCounts() {
+    Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size());
+    Iterator<Map.Entry<Object, List<AtomicLong>>> it =
+        callCounts.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<Object, List<AtomicLong>> entry = it.next();
+      Object user = entry.getKey();
+      Long decayedCount = entry.getValue().get(0).get();
+      if (decayedCount > 0) {
+        decayedCallCounts.put(user, decayedCount);
+      }
+    }
+    return decayedCallCounts;
+  }
 }
 }

+ 25 - 5
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java

@@ -30,6 +30,10 @@ import static org.mockito.Mockito.when;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
 public class TestDecayRpcScheduler {
 public class TestDecayRpcScheduler {
   private Schedulable mockCall(String id) {
   private Schedulable mockCall(String id) {
     Schedulable mockCall = mock(Schedulable.class);
     Schedulable mockCall = mock(Schedulable.class);
@@ -189,12 +193,14 @@ public class TestDecayRpcScheduler {
 
 
   @Test
   @Test
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
-  public void testPriority() {
+  public void testPriority() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
-      "25, 50, 75");
-    scheduler = new DecayRpcScheduler(4, "ns", conf);
+    final String namespace = "ns";
+    conf.set(namespace + "." + DecayRpcScheduler
+        .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
+    conf.set(namespace + "." + DecayRpcScheduler
+        .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
+    scheduler = new DecayRpcScheduler(4, namespace, conf);
 
 
     assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
@@ -206,6 +212,20 @@ public class TestDecayRpcScheduler {
     assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
     assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
+
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName mxbeanName = new ObjectName(
+        "Hadoop:service="+ namespace + ",name=DecayRpcScheduler");
+
+    String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
+    assertTrue("Get expected JMX of CallVolumeSummary before decay",
+        cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}"));
+
+    scheduler.forceDecay();
+
+    String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary");
+    assertTrue("Get expected JMX for CallVolumeSummary after decay",
+        cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}"));
   }
   }
 
 
   @Test(timeout=2000)
   @Test(timeout=2000)