|
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.cli.CommandLine;
|
|
|
import org.apache.commons.cli.GnuParser;
|
|
|
import org.apache.commons.cli.HelpFormatter;
|
|
@@ -281,8 +282,8 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public ApplicationMaster() throws Exception {
|
|
|
- // Set up the configuration and RPC
|
|
|
+ public ApplicationMaster() {
|
|
|
+ // Set up the configuration
|
|
|
conf = new YarnConfiguration();
|
|
|
}
|
|
|
|
|
@@ -470,7 +471,7 @@ public class ApplicationMaster {
|
|
|
amRMClient.init(conf);
|
|
|
amRMClient.start();
|
|
|
|
|
|
- containerListener = new NMCallbackHandler();
|
|
|
+ containerListener = createNMCallbackHandler();
|
|
|
nmClientAsync = new NMClientAsyncImpl(containerListener);
|
|
|
nmClientAsync.init(conf);
|
|
|
nmClientAsync.start();
|
|
@@ -500,7 +501,6 @@ public class ApplicationMaster {
|
|
|
containerMemory = maxMem;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
// Setup ask for containers from RM
|
|
|
// Send request for containers to RM
|
|
|
// Until we get our fully allocated quota, we keep on polling RM for
|
|
@@ -513,7 +513,8 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
numRequestedContainers.set(numTotalContainers);
|
|
|
|
|
|
- while (!done) {
|
|
|
+ while (!done
|
|
|
+ && (numCompletedContainers.get() != numTotalContainers)) {
|
|
|
try {
|
|
|
Thread.sleep(200);
|
|
|
} catch (InterruptedException ex) {}
|
|
@@ -522,7 +523,12 @@ public class ApplicationMaster {
|
|
|
|
|
|
return success;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ NMCallbackHandler createNMCallbackHandler() {
|
|
|
+ return new NMCallbackHandler(this);
|
|
|
+ }
|
|
|
+
|
|
|
private void finish() {
|
|
|
// Join all launched threads
|
|
|
// needed for when we time out
|
|
@@ -566,7 +572,6 @@ public class ApplicationMaster {
|
|
|
LOG.error("Failed to unregister application", e);
|
|
|
}
|
|
|
|
|
|
- done = true;
|
|
|
amRMClient.stop();
|
|
|
}
|
|
|
|
|
@@ -679,10 +684,17 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
|
|
|
+ @VisibleForTesting
|
|
|
+ static class NMCallbackHandler
|
|
|
+ implements NMClientAsync.CallbackHandler {
|
|
|
|
|
|
private ConcurrentMap<ContainerId, Container> containers =
|
|
|
new ConcurrentHashMap<ContainerId, Container>();
|
|
|
+ private final ApplicationMaster applicationMaster;
|
|
|
+
|
|
|
+ public NMCallbackHandler(ApplicationMaster applicationMaster) {
|
|
|
+ this.applicationMaster = applicationMaster;
|
|
|
+ }
|
|
|
|
|
|
public void addContainer(ContainerId containerId, Container container) {
|
|
|
containers.putIfAbsent(containerId, container);
|
|
@@ -713,7 +725,7 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
Container container = containers.get(containerId);
|
|
|
if (container != null) {
|
|
|
- nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
|
|
+ applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -721,6 +733,8 @@ public class ApplicationMaster {
|
|
|
public void onStartContainerError(ContainerId containerId, Throwable t) {
|
|
|
LOG.error("Failed to start Container " + containerId);
|
|
|
containers.remove(containerId);
|
|
|
+ applicationMaster.numCompletedContainers.incrementAndGet();
|
|
|
+ applicationMaster.numFailedContainers.incrementAndGet();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -847,7 +861,6 @@ public class ApplicationMaster {
|
|
|
/**
|
|
|
* Setup the request that will be sent to the RM for the container ask.
|
|
|
*
|
|
|
- * @param numContainers Containers to ask for from RM
|
|
|
* @return the setup ResourceRequest to be sent to RM
|
|
|
*/
|
|
|
private ContainerRequest setupContainerAskForRM() {
|