|
@@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
|
import org.junit.Assert;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
* Extends the FederationClientInterceptor and overrides methods to provide a
|
|
@@ -56,6 +58,9 @@ import org.junit.Assert;
|
|
|
public class TestableFederationClientInterceptor
|
|
|
extends FederationClientInterceptor {
|
|
|
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(TestableFederationClientInterceptor.class);
|
|
|
+
|
|
|
private ConcurrentHashMap<SubClusterId, MockRM> mockRMs =
|
|
|
new ConcurrentHashMap<>();
|
|
|
|
|
@@ -161,4 +166,31 @@ public class TestableFederationClientInterceptor
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void shutdown() {
|
|
|
+ if (mockRMs != null && !mockRMs.isEmpty()) {
|
|
|
+ for (Map.Entry<SubClusterId, MockRM> item : mockRMs.entrySet()) {
|
|
|
+ SubClusterId subClusterId = item.getKey();
|
|
|
+
|
|
|
+ // close mockNM
|
|
|
+ MockNM mockNM = mockNMs.getOrDefault(subClusterId, null);
|
|
|
+ try {
|
|
|
+ mockNM.unRegisterNode();
|
|
|
+ mockNM = null;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("mockNM unRegisterNode error.", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // close mockRM
|
|
|
+ MockRM mockRM = item.getValue();
|
|
|
+ if (mockRM != null) {
|
|
|
+ mockRM.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mockNMs.clear();
|
|
|
+ mockRMs.clear();
|
|
|
+ super.shutdown();
|
|
|
+ }
|
|
|
}
|