Jelajahi Sumber

YARN-7402. BackPort [GPG] Fix potential connection leak in GPGUtils. (#5901)

slfan1989 1 tahun lalu
induk
melakukan
6d3bcaa674

+ 21 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java

@@ -18,20 +18,21 @@
 
 package org.apache.hadoop.yarn.server.globalpolicygenerator;
 
+import static javax.servlet.http.HttpServletResponse.SC_OK;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 
 /**
  * GPGUtils contains utility functions for the GPG.
@@ -57,15 +58,24 @@ public final class GPGUtils {
     T obj = null;
 
     WebResource webResource = client.resource(webAddr);
-    ClientResponse response = webResource.path("ws/v1/cluster").path(path)
-        .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
-    if (response.getStatus() == HttpServletResponse.SC_OK) {
-      obj = response.getEntity(returnType);
-    } else {
-      throw new YarnRuntimeException("Bad response from remote web service: "
-          + response.getStatus());
+    ClientResponse response = null;
+    try {
+      response = webResource.path("ws/v1/cluster").path(path)
+          .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+      if (response.getStatus() == SC_OK) {
+        obj = response.getEntity(returnType);
+      } else {
+        throw new YarnRuntimeException(
+            "Bad response from remote web service: " + response.getStatus());
+      }
+      return obj;
+    } finally {
+      if (response != null) {
+        response.close();
+        response = null;
+      }
+      client.destroy();
     }
-    return obj;
   }
 
   /**

+ 50 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java

@@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -292,11 +294,56 @@ public class TestPolicyGenerator {
     resourceManager.start();
 
     String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
-    SchedulerTypeInfo sti = GPGUtils
-        .invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER,
-            SchedulerTypeInfo.class);
+    SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER,
+        SchedulerTypeInfo.class);
 
     Assert.assertNotNull(sti);
+    SchedulerInfo schedulerInfo = sti.getSchedulerInfo();
+    Assert.assertTrue(schedulerInfo instanceof CapacitySchedulerInfo);
+
+    CapacitySchedulerInfo capacitySchedulerInfo = (CapacitySchedulerInfo) schedulerInfo;
+    Assert.assertNotNull(capacitySchedulerInfo);
+
+    CapacitySchedulerQueueInfoList queues = capacitySchedulerInfo.getQueues();
+    Assert.assertNotNull(queues);
+    ArrayList<CapacitySchedulerQueueInfo> queueInfoList = queues.getQueueInfoList();
+    Assert.assertNotNull(queueInfoList);
+    Assert.assertEquals(2, queueInfoList.size());
+
+    CapacitySchedulerQueueInfo queueA = queueInfoList.get(0);
+    Assert.assertNotNull(queueA);
+    Assert.assertEquals("root.a", queueA.getQueuePath());
+    Assert.assertEquals(10.5f, queueA.getCapacity(), 0.00001);
+    CapacitySchedulerQueueInfoList queueAQueues = queueA.getQueues();
+    Assert.assertNotNull(queueAQueues);
+    ArrayList<CapacitySchedulerQueueInfo> queueInfoAList = queueAQueues.getQueueInfoList();
+    Assert.assertNotNull(queueInfoAList);
+    Assert.assertEquals(2, queueInfoAList.size());
+    CapacitySchedulerQueueInfo queueA1 = queueInfoAList.get(0);
+    Assert.assertNotNull(queueA1);
+    Assert.assertEquals(30f, queueA1.getCapacity(), 0.00001);
+    CapacitySchedulerQueueInfo queueA2 = queueInfoAList.get(1);
+    Assert.assertNotNull(queueA2);
+    Assert.assertEquals(70f, queueA2.getCapacity(), 0.00001);
+
+    CapacitySchedulerQueueInfo queueB = queueInfoList.get(1);
+    Assert.assertNotNull(queueB);
+    Assert.assertEquals("root.b", queueB.getQueuePath());
+    Assert.assertEquals(89.5f, queueB.getCapacity(), 0.00001);
+    CapacitySchedulerQueueInfoList queueBQueues = queueB.getQueues();
+    Assert.assertNotNull(queueBQueues);
+    ArrayList<CapacitySchedulerQueueInfo> queueInfoBList = queueBQueues.getQueueInfoList();
+    Assert.assertNotNull(queueInfoBList);
+    Assert.assertEquals(3, queueInfoBList.size());
+    CapacitySchedulerQueueInfo queueB1 = queueInfoBList.get(0);
+    Assert.assertNotNull(queueB1);
+    Assert.assertEquals(79.2f, queueB1.getCapacity(), 0.00001);
+    CapacitySchedulerQueueInfo queueB2 = queueInfoBList.get(1);
+    Assert.assertNotNull(queueB2);
+    Assert.assertEquals(0.8f, queueB2.getCapacity(), 0.00001);
+    CapacitySchedulerQueueInfo queueB3 = queueInfoBList.get(2);
+    Assert.assertNotNull(queueB3);
+    Assert.assertEquals(20f, queueB3.getCapacity(), 0.00001);
   }
 
   /**