|
@@ -22,6 +22,8 @@ import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.InetAddress;
|
|
import java.net.InetAddress;
|
|
import java.net.UnknownHostException;
|
|
import java.net.UnknownHostException;
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -34,8 +36,10 @@ import org.apache.hadoop.service.AbstractService;
|
|
import org.apache.hadoop.service.CompositeService;
|
|
import org.apache.hadoop.service.CompositeService;
|
|
import org.apache.hadoop.util.Shell;
|
|
import org.apache.hadoop.util.Shell;
|
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
@@ -52,6 +56,10 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -83,6 +91,9 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
|
|
|
private ResourceManagerWrapper resourceManagerWrapper;
|
|
private ResourceManagerWrapper resourceManagerWrapper;
|
|
|
|
|
|
|
|
+ private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
|
|
|
|
+ new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
|
|
|
|
+
|
|
private File testWorkDir;
|
|
private File testWorkDir;
|
|
|
|
|
|
// Number of nm-local-dirs per nodemanager
|
|
// Number of nm-local-dirs per nodemanager
|
|
@@ -210,6 +221,16 @@ public class MiniYARNCluster extends CompositeService {
|
|
};
|
|
};
|
|
};
|
|
};
|
|
resourceManager.init(conf);
|
|
resourceManager.init(conf);
|
|
|
|
+ resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,
|
|
|
|
+ new EventHandler<RMAppAttemptEvent>() {
|
|
|
|
+ public void handle(RMAppAttemptEvent event) {
|
|
|
|
+ if (event instanceof RMAppAttemptRegistrationEvent) {
|
|
|
|
+ appMasters.put(event.getApplicationAttemptId(), event.getTimestamp());
|
|
|
|
+ } else if (event instanceof RMAppAttemptUnregistrationEvent) {
|
|
|
|
+ appMasters.remove(event.getApplicationAttemptId());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
super.serviceInit(conf);
|
|
super.serviceInit(conf);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -243,9 +264,22 @@ public class MiniYARNCluster extends CompositeService {
|
|
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
|
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException {
|
|
|
|
+ long started = System.currentTimeMillis();
|
|
|
|
+ synchronized (appMasters) {
|
|
|
|
+ while (!appMasters.isEmpty() && System.currentTimeMillis() - started < timeoutMillis) {
|
|
|
|
+ appMasters.wait(1000);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (!appMasters.isEmpty()) {
|
|
|
|
+ LOG.warn("Stopping RM while some app masters are still alive");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
protected synchronized void serviceStop() throws Exception {
|
|
protected synchronized void serviceStop() throws Exception {
|
|
if (resourceManager != null) {
|
|
if (resourceManager != null) {
|
|
|
|
+ waitForAppMastersToFinish(5000);
|
|
resourceManager.stop();
|
|
resourceManager.stop();
|
|
}
|
|
}
|
|
super.serviceStop();
|
|
super.serviceStop();
|