|
@@ -184,7 +184,7 @@ public class ApplicationMaster {
|
|
private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
|
|
private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
|
|
|
|
|
|
// Launch threads
|
|
// Launch threads
|
|
- private List<Thread> launchThreads = new ArrayList<Thread>();
|
|
|
|
|
|
+ private List<Thread> launchThreads = new ArrayList<Thread>();
|
|
|
|
|
|
/**
|
|
/**
|
|
* @param args Command line args
|
|
* @param args Command line args
|
|
@@ -194,7 +194,7 @@ public class ApplicationMaster {
|
|
try {
|
|
try {
|
|
ApplicationMaster appMaster = new ApplicationMaster();
|
|
ApplicationMaster appMaster = new ApplicationMaster();
|
|
LOG.info("Initializing ApplicationMaster");
|
|
LOG.info("Initializing ApplicationMaster");
|
|
- boolean doRun = appMaster.init(args);
|
|
|
|
|
|
+ boolean doRun = appMaster.init(args);
|
|
if (!doRun) {
|
|
if (!doRun) {
|
|
System.exit(0);
|
|
System.exit(0);
|
|
}
|
|
}
|
|
@@ -202,14 +202,14 @@ public class ApplicationMaster {
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.fatal("Error running ApplicationMaster", t);
|
|
LOG.fatal("Error running ApplicationMaster", t);
|
|
System.exit(1);
|
|
System.exit(1);
|
|
- }
|
|
|
|
|
|
+ }
|
|
if (result) {
|
|
if (result) {
|
|
LOG.info("Application Master completed successfully. exiting");
|
|
LOG.info("Application Master completed successfully. exiting");
|
|
System.exit(0);
|
|
System.exit(0);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
LOG.info("Application Master failed. exiting");
|
|
LOG.info("Application Master failed. exiting");
|
|
- System.exit(2);
|
|
|
|
|
|
+ System.exit(2);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -218,7 +218,7 @@ public class ApplicationMaster {
|
|
*/
|
|
*/
|
|
private void dumpOutDebugInfo() {
|
|
private void dumpOutDebugInfo() {
|
|
|
|
|
|
- LOG.info("Dump debug output");
|
|
|
|
|
|
+ LOG.info("Dump debug output");
|
|
Map<String, String> envs = System.getenv();
|
|
Map<String, String> envs = System.getenv();
|
|
for (Map.Entry<String, String> env : envs.entrySet()) {
|
|
for (Map.Entry<String, String> env : envs.entrySet()) {
|
|
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
|
|
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
|
|
@@ -277,7 +277,7 @@ public class ApplicationMaster {
|
|
if (args.length == 0) {
|
|
if (args.length == 0) {
|
|
printUsage(opts);
|
|
printUsage(opts);
|
|
throw new IllegalArgumentException("No args specified for application master to initialize");
|
|
throw new IllegalArgumentException("No args specified for application master to initialize");
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
if (cliParser.hasOption("help")) {
|
|
if (cliParser.hasOption("help")) {
|
|
printUsage(opts);
|
|
printUsage(opts);
|
|
@@ -297,8 +297,8 @@ public class ApplicationMaster {
|
|
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
|
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
- throw new IllegalArgumentException("Application Attempt Id not set in the environment");
|
|
|
|
- }
|
|
|
|
|
|
+ throw new IllegalArgumentException("Application Attempt Id not set in the environment");
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
|
|
ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
|
|
appAttemptID = containerId.getApplicationAttemptId();
|
|
appAttemptID = containerId.getApplicationAttemptId();
|
|
@@ -338,11 +338,11 @@ public class ApplicationMaster {
|
|
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
|
|
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
|
|
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
|
|
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
|
|
|
|
|
|
- if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
|
|
|
|
- shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
|
|
|
|
- }
|
|
|
|
|
|
+ if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
|
|
|
|
+ shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
|
|
|
|
+ }
|
|
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
|
|
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
|
|
- shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
|
|
|
|
|
|
+ shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
|
|
}
|
|
}
|
|
|
|
|
|
if (!shellScriptPath.isEmpty()
|
|
if (!shellScriptPath.isEmpty()
|
|
@@ -351,7 +351,7 @@ public class ApplicationMaster {
|
|
LOG.error("Illegal values in env for shell script path"
|
|
LOG.error("Illegal values in env for shell script path"
|
|
+ ", path=" + shellScriptPath
|
|
+ ", path=" + shellScriptPath
|
|
+ ", len=" + shellScriptPathLen
|
|
+ ", len=" + shellScriptPathLen
|
|
- + ", timestamp=" + shellScriptPathTimestamp);
|
|
|
|
|
|
+ + ", timestamp=" + shellScriptPathTimestamp);
|
|
throw new IllegalArgumentException("Illegal values in env for shell script path");
|
|
throw new IllegalArgumentException("Illegal values in env for shell script path");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -368,7 +368,7 @@ public class ApplicationMaster {
|
|
* @param opts Parsed command line options
|
|
* @param opts Parsed command line options
|
|
*/
|
|
*/
|
|
private void printUsage(Options opts) {
|
|
private void printUsage(Options opts) {
|
|
- new HelpFormatter().printHelp("ApplicationMaster", opts);
|
|
|
|
|
|
+ new HelpFormatter().printHelp("ApplicationMaster", opts);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -378,7 +378,7 @@ public class ApplicationMaster {
|
|
public boolean run() throws YarnRemoteException {
|
|
public boolean run() throws YarnRemoteException {
|
|
LOG.info("Starting ApplicationMaster");
|
|
LOG.info("Starting ApplicationMaster");
|
|
|
|
|
|
- // Connect to ResourceManager
|
|
|
|
|
|
+ // Connect to ResourceManager
|
|
resourceManager = connectToRM();
|
|
resourceManager = connectToRM();
|
|
|
|
|
|
// Setup local RPC Server to accept status requests directly from clients
|
|
// Setup local RPC Server to accept status requests directly from clients
|
|
@@ -395,7 +395,7 @@ public class ApplicationMaster {
|
|
|
|
|
|
// A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
|
|
// A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
|
|
// a multiple of the min value and cannot exceed the max.
|
|
// a multiple of the min value and cannot exceed the max.
|
|
- // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
|
|
|
|
|
|
+ // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
|
|
if (containerMemory < minMem) {
|
|
if (containerMemory < minMem) {
|
|
LOG.info("Container memory specified below min threshold of cluster. Using min value."
|
|
LOG.info("Container memory specified below min threshold of cluster. Using min value."
|
|
+ ", specified=" + containerMemory
|
|
+ ", specified=" + containerMemory
|
|
@@ -409,14 +409,14 @@ public class ApplicationMaster {
|
|
containerMemory = maxMem;
|
|
containerMemory = maxMem;
|
|
}
|
|
}
|
|
|
|
|
|
- // Setup heartbeat emitter
|
|
|
|
|
|
+ // Setup heartbeat emitter
|
|
// TODO poll RM every now and then with an empty request to let RM know that we are alive
|
|
// TODO poll RM every now and then with an empty request to let RM know that we are alive
|
|
// The heartbeat interval after which an AM is timed out by the RM is defined by a config setting:
|
|
// The heartbeat interval after which an AM is timed out by the RM is defined by a config setting:
|
|
// RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
|
|
// RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
|
|
// The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter
|
|
// The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter
|
|
// is not required.
|
|
// is not required.
|
|
|
|
|
|
- // Setup ask for containers from RM
|
|
|
|
|
|
+ // Setup ask for containers from RM
|
|
// Send request for containers to RM
|
|
// Send request for containers to RM
|
|
// Until we get our fully allocated quota, we keep on polling RM for containers
|
|
// Until we get our fully allocated quota, we keep on polling RM for containers
|
|
// Keep looping until all the containers are launched and shell script executed on them
|
|
// Keep looping until all the containers are launched and shell script executed on them
|
|
@@ -426,7 +426,7 @@ public class ApplicationMaster {
|
|
|
|
|
|
while (numCompletedContainers.get() < numTotalContainers
|
|
while (numCompletedContainers.get() < numTotalContainers
|
|
&& !appDone) {
|
|
&& !appDone) {
|
|
- loopCounter++;
|
|
|
|
|
|
+ loopCounter++;
|
|
|
|
|
|
// log current state
|
|
// log current state
|
|
LOG.info("Current application state: loop=" + loopCounter
|
|
LOG.info("Current application state: loop=" + loopCounter
|
|
@@ -435,7 +435,7 @@ public class ApplicationMaster {
|
|
+ ", requested=" + numRequestedContainers
|
|
+ ", requested=" + numRequestedContainers
|
|
+ ", completed=" + numCompletedContainers
|
|
+ ", completed=" + numCompletedContainers
|
|
+ ", failed=" + numFailedContainers
|
|
+ ", failed=" + numFailedContainers
|
|
- + ", currentAllocated=" + numAllocatedContainers);
|
|
|
|
|
|
+ + ", currentAllocated=" + numAllocatedContainers);
|
|
|
|
|
|
// Sleep before each loop when asking RM for containers
|
|
// Sleep before each loop when asking RM for containers
|
|
// to avoid flooding RM with spurious requests when it
|
|
// to avoid flooding RM with spurious requests when it
|
|
@@ -444,7 +444,7 @@ public class ApplicationMaster {
|
|
try {
|
|
try {
|
|
Thread.sleep(1000);
|
|
Thread.sleep(1000);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
- LOG.info("Sleep interrupted " + e.getMessage());
|
|
|
|
|
|
+ LOG.info("Sleep interrupted " + e.getMessage());
|
|
}
|
|
}
|
|
|
|
|
|
// No. of containers to request
|
|
// No. of containers to request
|
|
@@ -457,14 +457,14 @@ public class ApplicationMaster {
|
|
// Setup request to be sent to RM to allocate containers
|
|
// Setup request to be sent to RM to allocate containers
|
|
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
|
|
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
|
|
if (askCount > 0) {
|
|
if (askCount > 0) {
|
|
- ResourceRequest containerAsk = setupContainerAskForRM(askCount);
|
|
|
|
|
|
+ ResourceRequest containerAsk = setupContainerAskForRM(askCount);
|
|
resourceReq.add(containerAsk);
|
|
resourceReq.add(containerAsk);
|
|
}
|
|
}
|
|
|
|
|
|
// Send the request to RM
|
|
// Send the request to RM
|
|
LOG.info("Asking RM for containers"
|
|
LOG.info("Asking RM for containers"
|
|
+ ", askCount=" + askCount);
|
|
+ ", askCount=" + askCount);
|
|
- AMResponse amResp = sendContainerAskToRM(resourceReq);
|
|
|
|
|
|
+ AMResponse amResp =sendContainerAskToRM(resourceReq);
|
|
|
|
|
|
// Retrieve list of allocated containers from the response
|
|
// Retrieve list of allocated containers from the response
|
|
List<Container> allocatedContainers = amResp.getAllocatedContainers();
|
|
List<Container> allocatedContainers = amResp.getAllocatedContainers();
|
|
@@ -478,10 +478,10 @@ public class ApplicationMaster {
|
|
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
|
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
|
+ ", containerState" + allocatedContainer.getState()
|
|
+ ", containerState" + allocatedContainer.getState()
|
|
+ ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
|
|
+ ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
|
|
- // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
|
|
|
|
|
|
+ //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
|
|
|
|
|
|
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
|
|
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
|
|
- Thread launchThread = new Thread(runnableLaunchContainer);
|
|
|
|
|
|
+ Thread launchThread = new Thread(runnableLaunchContainer);
|
|
|
|
|
|
// launch and start the container on a separate thread to keep the main thread unblocked
|
|
// launch and start the container on a separate thread to keep the main thread unblocked
|
|
// as all containers may not be allocated at one go.
|
|
// as all containers may not be allocated at one go.
|
|
@@ -492,14 +492,14 @@ public class ApplicationMaster {
|
|
// Check what the current available resources in the cluster are
|
|
// Check what the current available resources in the cluster are
|
|
// TODO should we do anything if the available resources are not enough?
|
|
// TODO should we do anything if the available resources are not enough?
|
|
Resource availableResources = amResp.getAvailableResources();
|
|
Resource availableResources = amResp.getAvailableResources();
|
|
- LOG.info("Current available resources in the cluster " + availableResources);
|
|
|
|
|
|
+ LOG.info("Current available resources in the cluster " + availableResources);
|
|
|
|
|
|
- // Check the completed containers
|
|
|
|
|
|
+ // Check the completed containers
|
|
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
|
|
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
|
|
LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
|
|
LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
|
|
- for (ContainerStatus containerStatus : completedContainers) {
|
|
|
|
|
|
+ for (ContainerStatus containerStatus : completedContainers) {
|
|
LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
|
|
LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
|
|
- + ", state=" + containerStatus.getState()
|
|
|
|
|
|
+ + ", state=" + containerStatus.getState()
|
|
+ ", exitStatus=" + containerStatus.getExitStatus()
|
|
+ ", exitStatus=" + containerStatus.getExitStatus()
|
|
+ ", diagnostics=" + containerStatus.getDiagnostics());
|
|
+ ", diagnostics=" + containerStatus.getDiagnostics());
|
|
|
|
|
|
@@ -514,7 +514,7 @@ public class ApplicationMaster {
|
|
// shell script failed
|
|
// shell script failed
|
|
// counts as completed
|
|
// counts as completed
|
|
numCompletedContainers.incrementAndGet();
|
|
numCompletedContainers.incrementAndGet();
|
|
- numFailedContainers.incrementAndGet();
|
|
|
|
|
|
+ numFailedContainers.incrementAndGet();
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
// something else bad happened
|
|
// something else bad happened
|
|
@@ -541,15 +541,15 @@ public class ApplicationMaster {
|
|
|
|
|
|
LOG.info("Current application state: loop=" + loopCounter
|
|
LOG.info("Current application state: loop=" + loopCounter
|
|
+ ", appDone=" + appDone
|
|
+ ", appDone=" + appDone
|
|
- + ", total=" + numTotalContainers
|
|
|
|
|
|
+ + ", total=" + numTotalContainers
|
|
+ ", requested=" + numRequestedContainers
|
|
+ ", requested=" + numRequestedContainers
|
|
+ ", completed=" + numCompletedContainers
|
|
+ ", completed=" + numCompletedContainers
|
|
+ ", failed=" + numFailedContainers
|
|
+ ", failed=" + numFailedContainers
|
|
- + ", currentAllocated=" + numAllocatedContainers);
|
|
|
|
|
|
+ + ", currentAllocated=" + numAllocatedContainers);
|
|
|
|
|
|
// TODO
|
|
// TODO
|
|
// Add a timeout handling layer
|
|
// Add a timeout handling layer
|
|
- // for misbehaving shell commands
|
|
|
|
|
|
+ // for misbehaving shell commands
|
|
}
|
|
}
|
|
|
|
|
|
// Join all launched threads
|
|
// Join all launched threads
|
|
@@ -561,7 +561,7 @@ public class ApplicationMaster {
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
LOG.info("Exception thrown in thread join: " + e.getMessage());
|
|
LOG.info("Exception thrown in thread join: " + e.getMessage());
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
- }
|
|
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
// When the application completes, it should send a finish application signal
|
|
// When the application completes, it should send a finish application signal
|
|
@@ -610,10 +610,11 @@ public class ApplicationMaster {
|
|
* Helper function to connect to CM
|
|
* Helper function to connect to CM
|
|
*/
|
|
*/
|
|
private void connectToCM() {
|
|
private void connectToCM() {
|
|
- String cmIpPortStr = container.getNodeId().getHost() + ":"
|
|
|
|
- + container.getNodeId().getPort();
|
|
|
|
- InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
|
|
|
|
- LOG.info("Connecting to ResourceManager at " + cmIpPortStr);
|
|
|
|
|
|
+ LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
|
|
|
|
+ String cmIpPortStr = container.getNodeId().getHost() + ":"
|
|
|
|
+ + container.getNodeId().getPort();
|
|
|
|
+ InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
|
|
|
|
+ LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
|
|
this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
|
|
this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -626,7 +627,6 @@ public class ApplicationMaster {
|
|
*/
|
|
*/
|
|
public void run() {
|
|
public void run() {
|
|
// Connect to ContainerManager
|
|
// Connect to ContainerManager
|
|
- LOG.info("Connecting to container manager for containerid=" + container.getId());
|
|
|
|
connectToCM();
|
|
connectToCM();
|
|
|
|
|
|
LOG.info("Setting up container launch container for containerid=" + container.getId());
|
|
LOG.info("Setting up container launch container for containerid=" + container.getId());
|
|
@@ -654,7 +654,7 @@ public class ApplicationMaster {
|
|
if (!shellScriptPath.isEmpty()) {
|
|
if (!shellScriptPath.isEmpty()) {
|
|
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
|
|
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
|
|
shellRsrc.setType(LocalResourceType.FILE);
|
|
shellRsrc.setType(LocalResourceType.FILE);
|
|
- shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
|
|
|
|
+ shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
try {
|
|
try {
|
|
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
|
|
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
|
|
} catch (URISyntaxException e) {
|
|
} catch (URISyntaxException e) {
|
|
@@ -664,17 +664,17 @@ public class ApplicationMaster {
|
|
|
|
|
|
// A failure scenario on bad input such as invalid shell script path
|
|
// A failure scenario on bad input such as invalid shell script path
|
|
// We know we cannot continue launching the container
|
|
// We know we cannot continue launching the container
|
|
- // so we should release it.
|
|
|
|
|
|
+ // so we should release it.
|
|
// TODO
|
|
// TODO
|
|
numCompletedContainers.incrementAndGet();
|
|
numCompletedContainers.incrementAndGet();
|
|
numFailedContainers.incrementAndGet();
|
|
numFailedContainers.incrementAndGet();
|
|
- return;
|
|
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
shellRsrc.setTimestamp(shellScriptPathTimestamp);
|
|
shellRsrc.setTimestamp(shellScriptPathTimestamp);
|
|
shellRsrc.setSize(shellScriptPathLen);
|
|
shellRsrc.setSize(shellScriptPathLen);
|
|
localResources.put(ExecShellStringPath, shellRsrc);
|
|
localResources.put(ExecShellStringPath, shellRsrc);
|
|
- }
|
|
|
|
- ctx.setLocalResources(localResources);
|
|
|
|
|
|
+ }
|
|
|
|
+ ctx.setLocalResources(localResources);
|
|
|
|
|
|
// Set the necessary command to execute on the allocated container
|
|
// Set the necessary command to execute on the allocated container
|
|
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
|
|
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
|
|
@@ -686,7 +686,7 @@ public class ApplicationMaster {
|
|
vargs.add(ExecShellStringPath);
|
|
vargs.add(ExecShellStringPath);
|
|
}
|
|
}
|
|
|
|
|
|
- // Set args for the shell command if any
|
|
|
|
|
|
+ // Set args for the shell command if any
|
|
vargs.add(shellArgs);
|
|
vargs.add(shellArgs);
|
|
// Add log redirect params
|
|
// Add log redirect params
|
|
// TODO
|
|
// TODO
|
|
@@ -722,19 +722,19 @@ public class ApplicationMaster {
|
|
// Left commented out as the shell scripts are short lived
|
|
// Left commented out as the shell scripts are short lived
|
|
// and we are relying on the status for completed containers from RM to detect status
|
|
// and we are relying on the status for completed containers from RM to detect status
|
|
|
|
|
|
- // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
|
|
|
|
- // statusReq.setContainerId(container.getId());
|
|
|
|
- // GetContainerStatusResponse statusResp;
|
|
|
|
- // try {
|
|
|
|
- // statusResp = cm.getContainerStatus(statusReq);
|
|
|
|
- // LOG.info("Container Status"
|
|
|
|
- // + ", id=" + container.getId()
|
|
|
|
- // + ", status=" +statusResp.getStatus());
|
|
|
|
- // } catch (YarnRemoteException e) {
|
|
|
|
- // e.printStackTrace();
|
|
|
|
- // }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
|
|
|
|
+ // statusReq.setContainerId(container.getId());
|
|
|
|
+ // GetContainerStatusResponse statusResp;
|
|
|
|
+ //try {
|
|
|
|
+ //statusResp = cm.getContainerStatus(statusReq);
|
|
|
|
+ // LOG.info("Container Status"
|
|
|
|
+ // + ", id=" + container.getId()
|
|
|
|
+ // + ", status=" +statusResp.getStatus());
|
|
|
|
+ //} catch (YarnRemoteException e) {
|
|
|
|
+ //e.printStackTrace();
|
|
|
|
+ //}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Connect to the Resource Manager
|
|
* Connect to the Resource Manager
|
|
@@ -744,25 +744,25 @@ public class ApplicationMaster {
|
|
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
|
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
|
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
|
|
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
|
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
|
|
|
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
|
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
|
|
return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Register the Application Master to the Resource Manager
|
|
* Register the Application Master to the Resource Manager
|
|
* @return the registration response from the RM
|
|
* @return the registration response from the RM
|
|
* @throws YarnRemoteException
|
|
* @throws YarnRemoteException
|
|
*/
|
|
*/
|
|
- private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
|
|
|
|
- RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
|
|
|
+ private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
|
|
|
|
+ RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
|
|
|
|
|
|
// set the required info into the registration request:
|
|
// set the required info into the registration request:
|
|
// application attempt id,
|
|
// application attempt id,
|
|
// host on which the app master is running
|
|
// host on which the app master is running
|
|
// rpc port on which the app master accepts requests from the client
|
|
// rpc port on which the app master accepts requests from the client
|
|
// tracking url for the app master
|
|
// tracking url for the app master
|
|
- appMasterRequest.setApplicationAttemptId(appAttemptID);
|
|
|
|
|
|
+ appMasterRequest.setApplicationAttemptId(appAttemptID);
|
|
appMasterRequest.setHost(appMasterHostname);
|
|
appMasterRequest.setHost(appMasterHostname);
|
|
appMasterRequest.setRpcPort(appMasterRpcPort);
|
|
appMasterRequest.setRpcPort(appMasterRpcPort);
|
|
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
|
|
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
|
|
@@ -792,7 +792,7 @@ public class ApplicationMaster {
|
|
Priority pri = Records.newRecord(Priority.class);
|
|
Priority pri = Records.newRecord(Priority.class);
|
|
// TODO - what is the range for priority? how to decide?
|
|
// TODO - what is the range for priority? how to decide?
|
|
pri.setPriority(requestPriority);
|
|
pri.setPriority(requestPriority);
|
|
- request.setPriority(pri);
|
|
|
|
|
|
+ request.setPriority(pri);
|
|
|
|
|
|
// Set up resource type requirements
|
|
// Set up resource type requirements
|
|
// For now, only memory is supported so we set memory requirements
|
|
// For now, only memory is supported so we set memory requirements
|
|
@@ -810,7 +810,7 @@ public class ApplicationMaster {
|
|
* @throws YarnRemoteException
|
|
* @throws YarnRemoteException
|
|
*/
|
|
*/
|
|
private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
|
|
private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
|
|
- throws YarnRemoteException {
|
|
|
|
|
|
+ throws YarnRemoteException {
|
|
AllocateRequest req = Records.newRecord(AllocateRequest.class);
|
|
AllocateRequest req = Records.newRecord(AllocateRequest.class);
|
|
req.setResponseId(rmRequestID.incrementAndGet());
|
|
req.setResponseId(rmRequestID.incrementAndGet());
|
|
req.setApplicationAttemptId(appAttemptID);
|
|
req.setApplicationAttemptId(appAttemptID);
|
|
@@ -830,7 +830,7 @@ public class ApplicationMaster {
|
|
LOG.info("Released container, id=" + id.getId());
|
|
LOG.info("Released container, id=" + id.getId());
|
|
}
|
|
}
|
|
|
|
|
|
- AllocateResponse resp = resourceManager.allocate(req);
|
|
|
|
- return resp.getAMResponse();
|
|
|
|
|
|
+ AllocateResponse resp = resourceManager.allocate(req);
|
|
|
|
+ return resp.getAMResponse();
|
|
}
|
|
}
|
|
}
|
|
}
|