Bläddra i källkod

YARN-11388: Prevent resource leaks in TestClientRMService. (#5187)

Signed-off-by: Shilun Fan <slfan1989@apache.org>
Chris Nauroth 2 år sedan
förälder
incheckning
6b67373d10

+ 160 - 190
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -57,9 +58,11 @@ import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -202,12 +205,17 @@ public class TestClientRMService {
       .getRecordFactory(null);
 
   private String appType = "MockApp";
-  
+
   private final static String QUEUE_1 = "Q-1";
   private final static String QUEUE_2 = "Q-2";
   private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
   private File resourceTypesFile = null;
 
+  private Configuration conf;
+  private ResourceManager resourceManager;
+  private YarnRPC rpc;
+  private ApplicationClientProtocol client;
+
   @Test
   public void testGetDecommissioningClusterNodes() throws Exception {
     MockRM rm = new MockRM() {
@@ -218,6 +226,7 @@ public class TestClientRMService {
             this.getRMContext().getRMDelegationTokenSecretManager());
       };
     };
+    resourceManager = rm;
     rm.start();
 
     int nodeMemory = 1024;
@@ -230,13 +239,12 @@ public class TestClientRMService {
     rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
 
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client =
-        (ApplicationClientProtocol) rpc
-            .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     // Make call
     List<NodeReport> nodeReports = client.getClusterNodes(
@@ -247,9 +255,6 @@ public class TestClientRMService {
     NodeReport nr = nodeReports.iterator().next();
     Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
     Assert.assertNull(nr.getNodeUpdateType());
-
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
 
   @Test
@@ -261,6 +266,7 @@ public class TestClientRMService {
           this.getRMContext().getRMDelegationTokenSecretManager());
       };
     };
+    resourceManager = rm;
     rm.start();
     RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
     labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
@@ -272,7 +278,7 @@ public class TestClientRMService {
     labelsMgr.replaceLabelsOnNode(map);
     rm.sendNodeStarted(node);
     node.nodeHeartbeat(true);
-    
+
     // Add and lose a node with label = y
     MockNM lostNode = rm.registerNode("host2:1235", 1024);
     rm.sendNodeStarted(lostNode);
@@ -281,13 +287,12 @@ public class TestClientRMService {
     rm.sendNodeLost(lostNode);
 
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client =
-        (ApplicationClientProtocol) rpc
-          .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     // Make call
     GetClusterNodesRequest request =
@@ -297,7 +302,7 @@ public class TestClientRMService {
     Assert.assertEquals(1, nodeReports.size());
     Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY,
         nodeReports.get(0).getNodeState());
-    
+
     // Check node's label = x
     Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
     Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
@@ -311,34 +316,34 @@ public class TestClientRMService {
     nodeReports = client.getClusterNodes(request).getNodeReports();
     Assert.assertEquals("Unhealthy nodes should not show up by default", 0,
         nodeReports.size());
-    
+
     // Change label of host1 to y
     map = new HashMap<NodeId, Set<String>>();
     map.put(node.getNodeId(), ImmutableSet.of("y"));
     labelsMgr.replaceLabelsOnNode(map);
-    
+
     // Now query for UNHEALTHY nodes
     request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY));
     nodeReports = client.getClusterNodes(request).getNodeReports();
     Assert.assertEquals(1, nodeReports.size());
     Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY,
         nodeReports.get(0).getNodeState());
-    
+
     Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
     Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
     Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
-    
+
     // Remove labels of host1
     map = new HashMap<NodeId, Set<String>>();
     map.put(node.getNodeId(), ImmutableSet.of("y"));
     labelsMgr.removeLabelsFromNode(map);
-    
+
     // Query all states should return all nodes
     rm.registerNode("host3:1236", 1024);
     request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
     nodeReports = client.getClusterNodes(request).getNodeReports();
     Assert.assertEquals(3, nodeReports.size());
-    
+
     // All host1-3's label should be empty (instead of null)
     for (NodeReport report : nodeReports) {
       Assert.assertTrue(report.getNodeLabels() != null
@@ -346,11 +351,8 @@ public class TestClientRMService {
       Assert.assertNull(report.getDecommissioningTimeout());
       Assert.assertNull(report.getNodeUpdateType());
     }
-
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
-  
+
   @Test
   public void testNonExistingApplicationReport() throws YarnException {
     RMContext rmContext = mock(RMContext.class);
@@ -358,7 +360,6 @@ public class TestClientRMService {
         new ConcurrentHashMap<ApplicationId, RMApp>());
     ClientRMService rmService = new ClientRMService(rmContext, null, null,
         null, null, null);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     GetApplicationReportRequest request = recordFactory
         .newRecordInstance(GetApplicationReportRequest.class);
     request.setApplicationId(ApplicationId.newInstance(0, 0));
@@ -373,7 +374,7 @@ public class TestClientRMService {
     }
   }
 
-   @Test
+  @Test
   public void testGetApplicationReport() throws Exception {
     ResourceScheduler scheduler = mock(ResourceScheduler.class);
     RMContext rmContext = mock(RMContext.class);
@@ -389,14 +390,13 @@ public class TestClientRMService {
     ClientRMService rmService = new ClientRMService(rmContext, scheduler,
         null, mockAclsManager, null, null);
     try {
-      RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
       GetApplicationReportRequest request = recordFactory
           .newRecordInstance(GetApplicationReportRequest.class);
       request.setApplicationId(appId1);
-      GetApplicationReportResponse response = 
+      GetApplicationReportResponse response =
           rmService.getApplicationReport(request);
       ApplicationReport report = response.getApplicationReport();
-      ApplicationResourceUsageReport usageReport = 
+      ApplicationResourceUsageReport usageReport =
           report.getApplicationResourceUsageReport();
       Assert.assertEquals(10, usageReport.getMemorySeconds());
       Assert.assertEquals(3, usageReport.getVcoreSeconds());
@@ -443,12 +443,11 @@ public class TestClientRMService {
       rmService.close();
     }
   }
-  
+
   @Test
   public void testGetApplicationAttemptReport() throws YarnException,
       IOException {
     ClientRMService rmService = createRMService();
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     GetApplicationAttemptReportRequest request = recordFactory
         .newRecordInstance(GetApplicationAttemptReportRequest.class);
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@@ -477,7 +476,7 @@ public class TestClientRMService {
           public void handle(Event event) {
           }
         });
-    ApplicationSubmissionContext asContext = 
+    ApplicationSubmissionContext asContext =
         mock(ApplicationSubmissionContext.class);
     YarnConfiguration config = new YarnConfiguration();
     RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
@@ -490,7 +489,6 @@ public class TestClientRMService {
   @Test
   public void testGetApplicationAttempts() throws YarnException, IOException {
     ClientRMService rmService = createRMService();
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     GetApplicationAttemptsRequest request = recordFactory
         .newRecordInstance(GetApplicationAttemptsRequest.class);
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@@ -512,7 +510,6 @@ public class TestClientRMService {
   @Test
   public void testGetContainerReport() throws YarnException, IOException {
     ClientRMService rmService = createRMService();
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     GetContainerReportRequest request = recordFactory
         .newRecordInstance(GetContainerReportRequest.class);
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@@ -533,7 +530,6 @@ public class TestClientRMService {
   @Test
   public void testGetContainers() throws YarnException, IOException {
     ClientRMService rmService = createRMService();
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
     GetContainersRequest request = recordFactory
         .newRecordInstance(GetContainersRequest.class);
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@@ -598,12 +594,13 @@ public class TestClientRMService {
 
   @Test
   public void testApplicationTagsValidation() throws IOException {
-    YarnConfiguration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     int maxtags = 3, appMaxTagLength = 5;
     conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAGS, maxtags);
     conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAG_LENGTH,
         appMaxTagLength);
     MockRM rm = new MockRM(conf);
+    resourceManager = rm;
     rm.init(conf);
     rm.start();
 
@@ -625,7 +622,6 @@ public class TestClientRMService {
     tags = Arrays.asList("tãg1", "tag2#");
     validateApplicationTag(rmService, tags,
         "A tag can only have ASCII characters! Invalid tag - tãg1");
-    rm.close();
   }
 
   private void validateApplicationTag(ClientRMService rmService,
@@ -643,9 +639,10 @@ public class TestClientRMService {
 
   @Test
   public void testForceKillApplication() throws Exception {
-    YarnConfiguration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setBoolean(MockRM.ENABLE_WEBAPP, true);
     MockRM rm = new MockRM(conf);
+    resourceManager = rm;
     rm.init(conf);
     rm.start();
 
@@ -701,7 +698,7 @@ public class TestClientRMService {
     assertEquals("Incorrect number of apps in the RM", 2,
         rmService.getApplications(getRequest).getApplicationList().size());
   }
-  
+
   @Test (expected = ApplicationNotFoundException.class)
   public void testMoveAbsentApplication() throws YarnException {
     RMContext rmContext = mock(RMContext.class);
@@ -1205,7 +1202,7 @@ public class TestClientRMService {
                         new byte[]{123, 123, 123, 123}));
     Server.getCurCall().set(mockCall);
   }
-  
+
   @Test (timeout = 30000)
   @SuppressWarnings ("rawtypes")
   public void testAppSubmit() throws Exception {
@@ -1353,7 +1350,7 @@ public class TestClientRMService {
     ApplicationId[] appIds =
         {getApplicationId(101), getApplicationId(102), getApplicationId(103)};
     List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
-    
+
     long[] submitTimeMillis = new long[3];
     // Submit applications
     for (int i = 0; i < appIds.length; i++) {
@@ -1380,23 +1377,23 @@ public class TestClientRMService {
     request.setLimit(1L);
     assertEquals("Failed to limit applications", 1,
         rmService.getApplications(request).getApplicationList().size());
-    
+
     // Check start range
     request = GetApplicationsRequest.newInstance();
     request.setStartRange(submitTimeMillis[0] + 1, System.currentTimeMillis());
-    
+
     // 2 applications are submitted after first timeMills
-    assertEquals("Incorrect number of matching start range", 
+    assertEquals("Incorrect number of matching start range",
         2, rmService.getApplications(request).getApplicationList().size());
-    
+
     // 1 application is submitted after the second timeMills
     request.setStartRange(submitTimeMillis[1] + 1, System.currentTimeMillis());
-    assertEquals("Incorrect number of matching start range", 
+    assertEquals("Incorrect number of matching start range",
         1, rmService.getApplications(request).getApplicationList().size());
-    
+
     // no application is submitted after the third timeMills
     request.setStartRange(submitTimeMillis[2] + 1, System.currentTimeMillis());
-    assertEquals("Incorrect number of matching start range", 
+    assertEquals("Incorrect number of matching start range",
         0, rmService.getApplications(request).getApplicationList().size());
 
     // Check queue
@@ -1468,7 +1465,7 @@ public class TestClientRMService {
     assertEquals("Incorrect number of applications for the scope", 3,
         rmService.getApplications(request).getApplicationList().size());
   }
-  
+
   @Test(timeout=4000)
   public void testConcurrentAppSubmit()
       throws IOException, InterruptedException, BrokenBarrierException,
@@ -1487,7 +1484,7 @@ public class TestClientRMService {
         appId1, null, null);
     final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
         appId2, null, null);
-    
+
     final CyclicBarrier startBarrier = new CyclicBarrier(2);
     final CyclicBarrier endBarrier = new CyclicBarrier(2);
 
@@ -1529,7 +1526,7 @@ public class TestClientRMService {
       }
     };
     t.start();
-    
+
     // submit another app, so go through while the first app is blocked
     startBarrier.await();
     rmService.submitApplication(submitRequest2);
@@ -1615,21 +1612,21 @@ public class TestClientRMService {
 
   private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
       RMContext rmContext, YarnScheduler yarnScheduler) {
-    ConcurrentHashMap<ApplicationId, RMApp> apps = 
-      new ConcurrentHashMap<ApplicationId, RMApp>();
+    ConcurrentHashMap<ApplicationId, RMApp> apps =
+        new ConcurrentHashMap<ApplicationId, RMApp>();
     ApplicationId applicationId1 = getApplicationId(1);
     ApplicationId applicationId2 = getApplicationId(2);
     ApplicationId applicationId3 = getApplicationId(3);
     YarnConfiguration config = new YarnConfiguration();
     apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1,
-        config, "testqueue", 10, 3,null,null));
+        config, "testqueue", 10, 3, null, null));
     apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2,
-        config, "a", 20, 2,null,""));
+        config, "a", 20, 2, null, ""));
     apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3,
-        config, "testqueue", 40, 5,"high-mem","high-mem"));
+        config, "testqueue", 40, 5, "high-mem", "high-mem"));
     return apps;
   }
-  
+
   private List<ApplicationAttemptId> getSchedulerApps(
       Map<ApplicationId, RMApp> apps) {
     List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>();
@@ -1642,7 +1639,7 @@ public class TestClientRMService {
   private static ApplicationId getApplicationId(int id) {
     return ApplicationId.newInstance(123456, id);
   }
-  
+
   private static ApplicationAttemptId getApplicationAttemptId(int id) {
     return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
   }
@@ -1667,7 +1664,7 @@ public class TestClientRMService {
                       String clientUserName, boolean allowAccess) {
                     ApplicationReport report = super.createAndGetApplicationReport(
                         clientUserName, allowAccess);
-                    ApplicationResourceUsageReport usageReport = 
+                    ApplicationResourceUsageReport usageReport =
                         report.getApplicationResourceUsageReport();
                     usageReport.setMemorySeconds(memorySeconds);
                     usageReport.setVcoreSeconds(vcoreSeconds);
@@ -1687,8 +1684,7 @@ public class TestClientRMService {
     RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
         SchedulerRequestKey.extractFrom(container), attemptId, null, "",
         rmContext));
-    Map<ApplicationAttemptId, RMAppAttempt> attempts = 
-      new HashMap<ApplicationAttemptId, RMAppAttempt>();
+    Map<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>();
     attempts.put(attemptId, rmAppAttemptImpl);
     when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
     when(app.getAppAttempts()).thenReturn(attempts);
@@ -1750,6 +1746,7 @@ public class TestClientRMService {
         ResourceScheduler.class);
     conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
     MockRM rm = new MockRM(conf);
+    resourceManager = rm;
     rm.start();
     try {
       rm.registerNode("127.0.0.1:1", 102400, 100);
@@ -1790,8 +1787,8 @@ public class TestClientRMService {
 
   @Test
   public void testCreateReservation() {
-    ResourceManager rm = setupResourceManager();
-    ClientRMService clientService = rm.getClientRMService();
+    resourceManager = setupResourceManager();
+    ClientRMService clientService = resourceManager.getClientRMService();
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
@@ -1821,14 +1818,12 @@ public class TestClientRMService {
     } catch (Exception e) {
       Assert.assertTrue(e instanceof YarnException);
     }
-
-    rm.stop();
   }
 
   @Test
   public void testUpdateReservation() {
-    ResourceManager rm = setupResourceManager();
-    ClientRMService clientService = rm.getClientRMService();
+    resourceManager = setupResourceManager();
+    ClientRMService clientService = resourceManager.getClientRMService();
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
@@ -1857,14 +1852,12 @@ public class TestClientRMService {
     }
     Assert.assertNotNull(uResponse);
     System.out.println("Update reservation response: " + uResponse);
-
-    rm.stop();
   }
 
   @Test
   public void testListReservationsByReservationId() {
-    ResourceManager rm = setupResourceManager();
-    ClientRMService clientService = rm.getClientRMService();
+    resourceManager = setupResourceManager();
+    ClientRMService clientService = resourceManager.getClientRMService();
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
@@ -1888,14 +1881,12 @@ public class TestClientRMService {
         .getReservationId().getId(), reservationID.getId());
     Assert.assertEquals(response.getReservationAllocationState().get(0)
         .getResourceAllocationRequests().size(), 0);
-
-    rm.stop();
   }
 
   @Test
   public void testListReservationsByTimeInterval() {
-    ResourceManager rm = setupResourceManager();
-    ClientRMService clientService = rm.getClientRMService();
+    resourceManager = setupResourceManager();
+    ClientRMService clientService = resourceManager.getClientRMService();
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
@@ -1947,14 +1938,12 @@ public class TestClientRMService {
         reservationRequests.getInterpreter().toString());
     Assert.assertTrue(reservationRequests.getReservationResources().get(0)
         .getDuration() == duration);
-
-    rm.stop();
   }
 
   @Test
   public void testListReservationsByInvalidTimeInterval() {
-    ResourceManager rm = setupResourceManager();
-    ClientRMService clientService = rm.getClientRMService();
+    resourceManager = setupResourceManager();
+    ClientRMService clientService = resourceManager.getClientRMService();
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
@@ -1991,14 +1980,12 @@ public class TestClientRMService {
     Assert.assertEquals(1, response.getReservationAllocationState().size());
     Assert.assertEquals(response.getReservationAllocationState().get(0)
         .getReservationId().getId(), sRequest.getReservationId().getId());
-
-    rm.stop();
   }
 
   @Test
   public void testListReservationsByTimeIntervalContainingNoReservations() {
-    ResourceManager rm = setupResourceManager();
-    ClientRMService clientService = rm.getClientRMService();
+    resourceManager = setupResourceManager();
+    ClientRMService clientService = resourceManager.getClientRMService();
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
@@ -2073,14 +2060,12 @@ public class TestClientRMService {
     // Ensure all reservations are filtered out.
     Assert.assertNotNull(response);
     assertThat(response.getReservationAllocationState()).isEmpty();
-
-    rm.stop();
   }
 
   @Test
   public void testReservationDelete() {
-    ResourceManager rm = setupResourceManager();
-    ClientRMService clientService = rm.getClientRMService();
+    resourceManager = setupResourceManager();
+    ClientRMService clientService = resourceManager.getClientRMService();
     Clock clock = new UTCClock();
     long arrival = clock.getTime();
     long duration = 60000;
@@ -2114,8 +2099,6 @@ public class TestClientRMService {
     }
     Assert.assertNotNull(response);
     Assert.assertEquals(0, response.getReservationAllocationState().size());
-
-    rm.stop();
   }
 
   @Test
@@ -2128,6 +2111,7 @@ public class TestClientRMService {
                 .getRMDelegationTokenSecretManager());
       };
     };
+    resourceManager = rm;
     rm.start();
     NodeLabel labelX = NodeLabel.newInstance("x", false);
     NodeLabel labelY = NodeLabel.newInstance("y");
@@ -2142,12 +2126,12 @@ public class TestClientRMService {
     labelsMgr.replaceLabelsOnNode(map);
 
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
-        .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     // Get node labels collection
     GetClusterNodeLabelsResponse response = client
@@ -2168,9 +2152,6 @@ public class TestClientRMService {
     // Below label "x" is not present in the response as exclusivity is true
     Assert.assertFalse(nodeToLabels.get(node1).containsAll(
         Arrays.asList(NodeLabel.newInstance("x"))));
-
-    rpc.stopProxy(client, conf);
-    rm.stop();
   }
 
   @Test
@@ -2183,6 +2164,7 @@ public class TestClientRMService {
                 .getRMDelegationTokenSecretManager());
       };
     };
+    resourceManager = rm;
     rm.start();
 
     NodeLabel labelX = NodeLabel.newInstance("x", false);
@@ -2205,12 +2187,12 @@ public class TestClientRMService {
     labelsMgr.replaceLabelsOnNode(map);
 
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
-        .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     // Get node labels collection
     GetClusterNodeLabelsResponse response = client
@@ -2244,9 +2226,6 @@ public class TestClientRMService {
     Assert.assertTrue(labelsToNodes.get(labelZ.getName()).containsAll(
         Arrays.asList(node1B, node3B)));
     assertThat(labelsToNodes.get(labelY.getName())).isNull();
-
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
 
   @Test(timeout = 120000)
@@ -2259,6 +2238,7 @@ public class TestClientRMService {
             this.getRMContext().getRMDelegationTokenSecretManager());
       }
     };
+    resourceManager = rm;
     rm.start();
 
     NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
@@ -2278,12 +2258,12 @@ public class TestClientRMService {
     nodes.put(host2.getHost(), ImmutableSet.of(docker));
     mgr.addNodeAttributes(nodes);
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
-        .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     GetClusterNodeAttributesRequest request =
         GetClusterNodeAttributesRequest.newInstance();
@@ -2295,8 +2275,6 @@ public class TestClientRMService {
     Assert.assertTrue(attributes.contains(NodeAttributeInfo.newInstance(os)));
     Assert
         .assertTrue(attributes.contains(NodeAttributeInfo.newInstance(docker)));
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
 
   @Test(timeout = 120000)
@@ -2309,6 +2287,7 @@ public class TestClientRMService {
             this.getRMContext().getRMDelegationTokenSecretManager());
       }
     };
+    resourceManager = rm;
     rm.start();
 
     NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
@@ -2331,12 +2310,12 @@ public class TestClientRMService {
     nodes.put(node2, ImmutableSet.of(docker, dist));
     mgr.addNodeAttributes(nodes);
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
-        .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     GetAttributesToNodesRequest request =
         GetAttributesToNodesRequest.newInstance();
@@ -2377,8 +2356,6 @@ public class TestClientRMService {
         attrs3.get(os.getAttributeKey())));
     Assert.assertTrue(findHostnameAndValInMapping(node2, "docker0",
         attrs3.get(docker.getAttributeKey())));
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
 
   private boolean findHostnameAndValInMapping(String hostname, String attrVal,
@@ -2401,6 +2378,7 @@ public class TestClientRMService {
             this.getRMContext().getRMDelegationTokenSecretManager());
       }
     };
+    resourceManager = rm;
     rm.start();
 
     NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
@@ -2423,12 +2401,12 @@ public class TestClientRMService {
     nodes.put(node2, ImmutableSet.of(docker, dist));
     mgr.addNodeAttributes(nodes);
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
-        .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     // Specify null for hostnames.
     GetNodesToAttributesRequest request1 =
@@ -2471,8 +2449,6 @@ public class TestClientRMService {
         client.getNodesToAttributes(request4);
     hostToAttrs = response4.getNodeToAttributes();
     Assert.assertEquals(0, hostToAttrs.size());
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
 
   @Test(timeout = 120000)
@@ -2480,13 +2456,14 @@ public class TestClientRMService {
       throws Exception {
     int maxPriority = 10;
     int appPriority = 5;
-    YarnConfiguration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     Assume.assumeFalse("FairScheduler does not support Application Priorities",
         conf.get(YarnConfiguration.RM_SCHEDULER)
             .equals(FairScheduler.class.getName()));
     conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
         maxPriority);
     MockRM rm = new MockRM(conf);
+    resourceManager = rm;
     rm.init(conf);
     rm.start();
     MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
@@ -2498,20 +2475,20 @@ public class TestClientRMService {
     testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
     rm.killApp(app1.getApplicationId());
     rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
-    rm.stop();
   }
 
   @Test(timeout = 120000)
   public void testUpdateApplicationPriorityRequest() throws Exception {
     int maxPriority = 10;
     int appPriority = 5;
-    YarnConfiguration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     Assume.assumeFalse("FairScheduler does not support Application Priorities",
         conf.get(YarnConfiguration.RM_SCHEDULER)
             .equals(FairScheduler.class.getName()));
     conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
         maxPriority);
     MockRM rm = new MockRM(conf);
+    resourceManager = rm;
     rm.init(conf);
     rm.start();
     rm.registerNode("host1:1234", 1024);
@@ -2555,8 +2532,6 @@ public class TestClientRMService {
     Assert.assertEquals("Incorrect priority has been set to application",
         appPriority, rmService.updateApplicationPriority(updateRequest)
             .getApplicationPriority().getPriority());
-
-    rm.stop();
   }
 
   private void testApplicationPriorityUpdation(ClientRMService rmService,
@@ -2576,55 +2551,53 @@ public class TestClientRMService {
         updateApplicationPriority.getApplicationPriority().getPriority());
   }
 
-  private void createExcludeFile(String filename) throws IOException {
-    File file = new File(filename);
-    if (file.exists()) {
-      file.delete();
+  private File createExcludeFile(File testDir) throws IOException {
+    File excludeFile = new File(testDir, "excludeFile");
+    try (FileOutputStream out = new FileOutputStream(excludeFile)) {
+      out.write("decommisssionedHost".getBytes(UTF_8));
     }
-
-    FileOutputStream out = new FileOutputStream(file);
-    out.write("decommisssionedHost".getBytes());
-    out.close();
+    return excludeFile;
   }
 
   @Test
   public void testRMStartWithDecommissionedNode() throws Exception {
-    String excludeFile = "excludeFile";
-    createExcludeFile(excludeFile);
-    YarnConfiguration conf = new YarnConfiguration();
-    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
-        excludeFile);
-    MockRM rm = new MockRM(conf) {
-      protected ClientRMService createClientRMService() {
-        return new ClientRMService(this.rmContext, scheduler,
-            this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
-            this.getRMContext().getRMDelegationTokenSecretManager());
+    File testDir = GenericTestUtils.getRandomizedTestDir();
+    assertTrue("Failed to create test directory: " + testDir.getAbsolutePath(), testDir.mkdirs());
+    try {
+      File excludeFile = createExcludeFile(testDir);
+      conf = new YarnConfiguration();
+      conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+          excludeFile.getAbsolutePath());
+      MockRM rm = new MockRM(conf) {
+        protected ClientRMService createClientRMService() {
+          return new ClientRMService(this.rmContext, scheduler,
+              this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
+              this.getRMContext().getRMDelegationTokenSecretManager());
+        };
       };
-    };
-    rm.start();
-
-    YarnRPC rpc = YarnRPC.create(conf);
-    InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client =
-        (ApplicationClientProtocol) rpc
-            .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
-
-    // Make call
-    GetClusterNodesRequest request =
-        GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
-    List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
-    Assert.assertEquals(1, nodeReports.size());
-
-    rm.stop();
-    rpc.stopProxy(client, conf);
-    new File(excludeFile).delete();
+      resourceManager = rm;
+      rm.start();
+
+      rpc = YarnRPC.create(conf);
+      InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
+      LOG.info("Connecting to ResourceManager at " + rmAddress);
+      client = (ApplicationClientProtocol) rpc.getProxy(
+          ApplicationClientProtocol.class, rmAddress, conf);
+
+      // Make call
+      GetClusterNodesRequest request =
+          GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
+      List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
+      assertEquals(1, nodeReports.size());
+    } finally {
+      FileUtil.fullyDelete(testDir);
+    }
   }
 
   @Test
   public void testGetResourceTypesInfoWhenResourceProfileDisabled()
       throws Exception {
-    YarnConfiguration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     MockRM rm = new MockRM(conf) {
       protected ClientRMService createClientRMService() {
         return new ClientRMService(this.rmContext, scheduler,
@@ -2632,14 +2605,14 @@ public class TestClientRMService {
             this.getRMContext().getRMDelegationTokenSecretManager());
       }
     };
+    resourceManager = rm;
     rm.start();
 
-    YarnRPC rpc = YarnRPC.create(conf);
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client =
-        (ApplicationClientProtocol) rpc
-            .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     // Make call
     GetAllResourceTypeInfoRequest request =
@@ -2659,9 +2632,6 @@ public class TestClientRMService {
         response.getResourceTypeInfo().get(1).getName());
     Assert.assertEquals(ResourceInformation.VCORES.getUnits(),
         response.getResourceTypeInfo().get(1).getDefaultUnit());
-
-    rm.stop();
-    rpc.stopProxy(client, conf);
   }
 
   @Test
@@ -2755,6 +2725,7 @@ public class TestClientRMService {
           this.getRMContext().getRMDelegationTokenSecretManager());
       };
     };
+    resourceManager = rm;
     rm.start();
 
     Resource resource = BuilderUtils.newResource(1024, 1);
@@ -2769,13 +2740,12 @@ public class TestClientRMService {
     node.nodeHeartbeat(true);
 
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client =
-        (ApplicationClientProtocol) rpc
-          .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     // Make call
     GetClusterNodesRequest request =
@@ -2807,9 +2777,6 @@ public class TestClientRMService {
         getResourceInformation("memory-mb").getUnits());
     Assert.assertEquals(976562, nodeReports.get(0).getCapability().
         getResourceInformation("memory-mb").getValue());
-
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
 
   @Test
@@ -2821,6 +2788,7 @@ public class TestClientRMService {
           this.getRMContext().getRMDelegationTokenSecretManager());
       };
     };
+    resourceManager = rm;
     rm.start();
 
     ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
@@ -2833,13 +2801,12 @@ public class TestClientRMService {
     repeat(7, clusterMetrics::incrNumShutdownNMs);
 
     // Create a client.
-    Configuration conf = new Configuration();
-    YarnRPC rpc = YarnRPC.create(conf);
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
     InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
     LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationClientProtocol client =
-        (ApplicationClientProtocol) rpc
-          .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+    client = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, rmAddress, conf);
 
     YarnClusterMetrics ymetrics = client.getClusterMetrics(
         GetClusterMetricsRequest.newInstance()).getClusterMetrics();
@@ -2852,18 +2819,21 @@ public class TestClientRMService {
     Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers());
     Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers());
     Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers());
-
-    rpc.stopProxy(client, conf);
-    rm.close();
   }
 
   @After
-  public void tearDown(){
+  public void tearDown() throws Exception {
     if (resourceTypesFile != null && resourceTypesFile.exists()) {
       resourceTypesFile.delete();
     }
     ClusterMetrics.destroy();
     DefaultMetricsSystem.shutdown();
+    if (conf != null && client != null && rpc != null) {
+      rpc.stopProxy(client, conf);
+    }
+    if (resourceManager != null) {
+      resourceManager.close();
+    }
   }
 
   private static void repeat(int n, Runnable r) {