Browse Source

HADOOP-15121. Encounter NullPointerException when using DecayRpcScheduler. Contributed by Tao Jie.

Hanisha Koneru 7 years ago
parent
commit
3fde0f1db5

+ 10 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -236,8 +236,8 @@ public class DecayRpcScheduler implements RpcScheduler,
     DecayTask task = new DecayTask(this, timer);
     timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
 
-    metricsProxy = MetricsProxy.getInstance(ns, numLevels);
-    metricsProxy.setDelegate(this);
+    metricsProxy = MetricsProxy.getInstance(ns, numLevels, this);
+    recomputeScheduleCache();
   }
 
   // Load configs
@@ -680,21 +680,26 @@ public class DecayRpcScheduler implements RpcScheduler,
     private long[] callCountInLastWindowDefault;
     private ObjectName decayRpcSchedulerInfoBeanName;
 
-    private MetricsProxy(String namespace, int numLevels) {
+    private MetricsProxy(String namespace, int numLevels,
+        DecayRpcScheduler drs) {
       averageResponseTimeDefault = new double[numLevels];
       callCountInLastWindowDefault = new long[numLevels];
+      setDelegate(drs);
       decayRpcSchedulerInfoBeanName =
           MBeans.register(namespace, "DecayRpcScheduler", this);
       this.registerMetrics2Source(namespace);
     }
 
     public static synchronized MetricsProxy getInstance(String namespace,
-        int numLevels) {
+        int numLevels, DecayRpcScheduler drs) {
       MetricsProxy mp = INSTANCES.get(namespace);
       if (mp == null) {
         // We must create one
-        mp = new MetricsProxy(namespace, numLevels);
+        mp = new MetricsProxy(namespace, numLevels, drs);
         INSTANCES.put(namespace, mp);
+      } else  if (drs != mp.delegate.get()){
+        // in case of delegate is reclaimed, we should set it again
+        mp.setDelegate(drs);
       }
       return mp;
     }

+ 29 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java

@@ -19,19 +19,22 @@
 package org.apache.hadoop.ipc;
 
 import static java.lang.Thread.sleep;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 import org.junit.Test;
+
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import java.lang.management.ManagementFactory;
 
 public class TestDecayRpcScheduler {
@@ -248,4 +251,27 @@ public class TestDecayRpcScheduler {
       sleep(10);
     }
   }
+
+  @Test(timeout=60000)
+  public void testNPEatInitialization() throws InterruptedException {
+    // redirect the LOG to and check if there is NPE message while initializing
+    // the DecayRpcScheduler
+    PrintStream output = System.out;
+    try {
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(bytes));
+
+      // initializing DefaultMetricsSystem here would set "monitoring" flag in
+      // MetricsSystemImpl to true
+      DefaultMetricsSystem.initialize("NameNode");
+      Configuration conf = new Configuration();
+      scheduler = new DecayRpcScheduler(1, "ns", conf);
+      // check if there is npe in log
+      assertFalse(bytes.toString().contains("NullPointerException"));
+    } finally {
+      //set systout back
+      System.setOut(output);
+    }
+
+  }
 }