Bläddra i källkod

YARN-8893. [AMRMProxy] Fix thread leak in AMRMClientRelayer and UAM client. Contributed by Botong Huang.

Giovanni Matteo Fumarola 6 år sedan
förälder
incheckning
81da8b262b
9 ändrade filer med 102 tillägg och 76 borttagningar
  1. 12 43
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
  2. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
  3. 21 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
  4. 1 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
  5. 6 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
  6. 0 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
  7. 25 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
  8. 9 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
  9. 0 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java

+ 12 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java

@@ -27,9 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -46,8 +45,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.client.AMRMClientUtils;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -65,8 +62,7 @@ import com.google.common.annotations.VisibleForTesting;
  * pending requests similar to AMRMClient, and handles RM re-sync automatically
  * without propagate the re-sync exception back to AMRMClient.
  */
-public class AMRMClientRelayer extends AbstractService
-    implements ApplicationMasterProtocol {
+public class AMRMClientRelayer implements ApplicationMasterProtocol {
   private static final Logger LOG =
       LoggerFactory.getLogger(AMRMClientRelayer.class);
 
@@ -131,51 +127,16 @@ public class AMRMClientRelayer extends AbstractService
 
   private AMRMClientRelayerMetrics metrics;
 
-  public AMRMClientRelayer() {
-    super(AMRMClientRelayer.class.getName());
+  public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
+      ApplicationId appId, String rmId) {
     this.resetResponseId = -1;
     this.metrics = AMRMClientRelayerMetrics.getInstance();
-    this.rmClient = null;
-    this.appId = null;
     this.rmId = "";
-  }
-
-  public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
-      ApplicationId appId, String rmId) {
-    this();
     this.rmClient = rmClient;
     this.appId = appId;
     this.rmId = rmId;
   }
 
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    final YarnConfiguration conf = new YarnConfiguration(getConfig());
-    try {
-      if (this.rmClient == null) {
-        this.rmClient =
-            ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
-      }
-    } catch (IOException e) {
-      throw new YarnRuntimeException(e);
-    }
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (this.rmClient != null) {
-      RPC.stopProxy(this.rmClient);
-    }
-    shutdown();
-    super.serviceStop();
-  }
-
   public void setAMRegistrationRequest(
       RegisterApplicationMasterRequest registerRequest) {
     this.amRegistrationRequest = registerRequest;
@@ -226,6 +187,14 @@ public class AMRMClientRelayer extends AbstractService
             .decrClientPending(rmId, req.getContainerUpdateType(), 1);
       }
     }
+
+    if (this.rmClient != null) {
+      try {
+        RPC.stopProxy(this.rmClient);
+        this.rmClient = null;
+      } catch (HadoopIllegalArgumentException e) {
+      }
+    }
   }
 
   @Override

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java

@@ -370,6 +370,34 @@ public class UnmanagedAMPoolManager extends AbstractService {
     return response;
   }
 
+  /**
+   * Shutdown an UAM client without killing it in YarnRM.
+   *
+   * @param uamId uam Id
+   * @throws YarnException if fails
+   */
+  public void shutDownConnections(String uamId)
+      throws YarnException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    LOG.info(
+        "Shutting down UAM id {} for application {} without killing the UAM",
+        uamId, this.appIdMap.get(uamId));
+    this.unmanagedAppMasterMap.remove(uamId).shutDownConnections();
+  }
+
+  /**
+   * Shutdown all UAM clients without killing them in YarnRM.
+   *
+   * @throws YarnException if fails
+   */
+  public void shutDownConnections() throws YarnException {
+    for (String uamId : this.unmanagedAppMasterMap.keySet()) {
+      shutDownConnections(uamId);
+    }
+  }
+
   /**
    * Get the id of all running UAMs.
    *

+ 21 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java

@@ -255,9 +255,6 @@ public class UnmanagedApplicationManager {
   public FinishApplicationMasterResponse finishApplicationMaster(
       FinishApplicationMasterRequest request)
       throws YarnException, IOException {
-
-    this.heartbeatHandler.shutdown();
-
     if (this.userUgi == null) {
       if (this.connectionInitiated) {
         // This is possible if the async launchUAM is still
@@ -270,7 +267,12 @@ public class UnmanagedApplicationManager {
             + "be called before createAndRegister");
       }
     }
-    return this.rmProxyRelayer.finishApplicationMaster(request);
+    FinishApplicationMasterResponse response =
+        this.rmProxyRelayer.finishApplicationMaster(request);
+    if (response.getIsUnregistered()) {
+      shutDownConnections();
+    }
+    return response;
   }
 
   /**
@@ -282,11 +284,10 @@ public class UnmanagedApplicationManager {
    */
   public KillApplicationResponse forceKillApplication()
       throws IOException, YarnException {
+    shutDownConnections();
+
     KillApplicationRequest request =
         KillApplicationRequest.newInstance(this.applicationId);
-
-    this.heartbeatHandler.shutdown();
-
     if (this.rmClient == null) {
       this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
           UserGroupInformation.createRemoteUser(this.submitter), null);
@@ -323,6 +324,14 @@ public class UnmanagedApplicationManager {
     }
   }
 
+  /**
+   * Shutdown this UAM client, without killing the UAM in the YarnRM side.
+   */
+  public void shutDownConnections() {
+    this.heartbeatHandler.shutdown();
+    this.rmProxyRelayer.shutdown();
+  }
+
   /**
    * Returns the application id of the UAM.
    *
@@ -532,4 +541,9 @@ public class UnmanagedApplicationManager {
   protected void drainHeartbeatThread() {
     this.heartbeatHandler.drainHeartbeatThread();
   }
+
+  @VisibleForTesting
+  protected boolean isHeartbeatThreadAlive() {
+    return this.heartbeatHandler.isAlive();
+  }
 }

+ 1 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java

@@ -108,7 +108,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
@@ -323,9 +322,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       applicationContainerIdMap.remove(appId);
     }
 
-    return FinishApplicationMasterResponse.newInstance(
-        request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED
-            ? true : false);
+    return FinishApplicationMasterResponse.newInstance(true);
   }
 
   protected ApplicationId getApplicationId(int id) {

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestExceptio
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -155,16 +156,17 @@ public class TestAMRMClientRelayer {
 
     this.mockAMS = new MockApplicationMasterService();
     this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST");
-
-    this.relayer.init(conf);
-    this.relayer.start();
-
     this.relayer.registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance("", 0, ""));
 
     clearAllocateRequestLists();
   }
 
+  @After
+  public void cleanup() {
+    this.relayer.shutdown();
+  }
+
   private void assertAsksAndReleases(int expectedAsk, int expectedRelease) {
     Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size());
     Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size());

+ 0 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java

@@ -141,17 +141,11 @@ public class TestAMRMClientRelayerMetrics {
 
     this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
         ApplicationId.newInstance(0, 0), this.homeID);
-    this.homeRelayer.init(conf);
-    this.homeRelayer.start();
-
     this.homeRelayer.registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance("", 0, ""));
 
     this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
         ApplicationId.newInstance(0, 0), this.uamID);
-    this.uamRelayer.init(conf);
-    this.uamRelayer.start();
-
     this.uamRelayer.registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance("", 0, ""));
 

+ 25 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java

@@ -87,7 +87,7 @@ public class TestUnmanagedApplicationManager {
     }
   }
 
-  @Test(timeout = 5000)
+  @Test(timeout = 10000)
   public void testBasicUsage()
       throws YarnException, IOException, InterruptedException {
 
@@ -104,6 +104,11 @@ public class TestUnmanagedApplicationManager {
     finishApplicationMaster(
         FinishApplicationMasterRequest.newInstance(null, null, null),
         attemptId);
+
+    while (uam.isHeartbeatThreadAlive()) {
+      LOG.info("waiting for heartbeat thread to finish");
+      Thread.sleep(100);
+    }
   }
 
   /*
@@ -261,7 +266,7 @@ public class TestUnmanagedApplicationManager {
         attemptId);
   }
 
-  @Test
+  @Test(timeout = 10000)
   public void testForceKill()
       throws YarnException, IOException, InterruptedException {
     launchUAM(attemptId);
@@ -269,6 +274,11 @@ public class TestUnmanagedApplicationManager {
         RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
     uam.forceKillApplication();
 
+    while (uam.isHeartbeatThreadAlive()) {
+      LOG.info("waiting for heartbeat thread to finish");
+      Thread.sleep(100);
+    }
+
     try {
       uam.forceKillApplication();
       Assert.fail("Should fail because application is already killed");
@@ -276,6 +286,19 @@ public class TestUnmanagedApplicationManager {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testShutDownConnections()
+      throws YarnException, IOException, InterruptedException {
+    launchUAM(attemptId);
+    registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId);
+    uam.shutDownConnections();
+    while (uam.isHeartbeatThreadAlive()) {
+      LOG.info("waiting for heartbeat thread to finish");
+      Thread.sleep(100);
+    }
+  }
+
   protected UserGroupInformation getUGIWithToken(
       ApplicationAttemptId appAttemptId) {
     UserGroupInformation ugi =

+ 9 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -716,12 +716,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                   uamPool.finishApplicationMaster(subClusterId, finishRequest);
 
               if (uamResponse.getIsUnregistered()) {
-                AMRMClientRelayer relayer =
-                    secondaryRelayers.remove(subClusterId);
-                if(relayer != null) {
-                  relayer.shutdown();
-                }
-
+                secondaryRelayers.remove(subClusterId);
                 if (getNMStateStore() != null) {
                   getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
                       NMSS_SECONDARY_SC_PREFIX + subClusterId);
@@ -801,8 +796,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   @Override
   public void shutdown() {
+    LOG.info("Shutting down FederationInterceptor for {}", this.attemptId);
+
     // Do not stop uamPool service and kill UAMs here because of possible second
     // app attempt
+    try {
+      this.uamPool.shutDownConnections();
+    } catch (YarnException e) {
+      LOG.error("Error shutting down all UAM clients without killing them", e);
+    }
+
     if (this.threadpool != null) {
       try {
         this.threadpool.shutdown();
@@ -814,9 +817,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     // Stop the home heartbeat thread
     this.homeHeartbeartHandler.shutdown();
     this.homeRMRelayer.shutdown();
-    for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) {
-      relayer.shutdown();
-    }
 
     super.shutdown();
   }

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java

@@ -206,7 +206,6 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
         finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
 
     Assert.assertNotNull(finshResponse);
-    Assert.assertEquals(false, finshResponse.getIsUnregistered());
 
     try {
       // Try to finish an application master that is already finished.