|
@@ -22,6 +22,7 @@ import java.io.BufferedReader;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStreamReader;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.Map;
|
|
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
@@ -81,6 +83,8 @@ public class UnmanagedAMLauncher {
|
|
|
// set the classpath explicitly
|
|
|
private String classpath = null;
|
|
|
|
|
|
+ private volatile boolean amCompleted = false;
|
|
|
+
|
|
|
/**
|
|
|
* @param args
|
|
|
* Command line arguments
|
|
@@ -179,8 +183,18 @@ public class UnmanagedAMLauncher {
|
|
|
if(!setClasspath && classpath!=null) {
|
|
|
envAMList.add("CLASSPATH="+classpath);
|
|
|
}
|
|
|
-
|
|
|
- envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
|
|
|
+
|
|
|
+ ContainerId containerId = Records.newRecord(ContainerId.class);
|
|
|
+ containerId.setApplicationAttemptId(attemptId);
|
|
|
+ containerId.setId(0);
|
|
|
+
|
|
|
+ String hostname = InetAddress.getLocalHost().getHostName();
|
|
|
+ envAMList.add(ApplicationConstants.AM_CONTAINER_ID_ENV + "=" + containerId);
|
|
|
+ envAMList.add(ApplicationConstants.NM_HOST_ENV + "=" + hostname);
|
|
|
+ envAMList.add(ApplicationConstants.NM_HTTP_PORT_ENV + "=0");
|
|
|
+ envAMList.add(ApplicationConstants.NM_PORT_ENV + "=0");
|
|
|
+ envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
|
|
|
+ + System.currentTimeMillis());
|
|
|
|
|
|
String[] envAM = new String[envAMList.size()];
|
|
|
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
|
|
@@ -233,8 +247,10 @@ public class UnmanagedAMLauncher {
|
|
|
LOG.info("AM process exited with value: " + exitCode);
|
|
|
} catch (InterruptedException e) {
|
|
|
e.printStackTrace();
|
|
|
+ } finally {
|
|
|
+ amCompleted = true;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
// make sure that the error thread exits
|
|
|
// on Windows these threads sometimes get stuck and hang the execution
|
|
@@ -306,6 +322,7 @@ public class UnmanagedAMLauncher {
|
|
|
appReport = monitorApplication(appId, EnumSet.of(
|
|
|
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
|
|
|
YarnApplicationState.FINISHED));
|
|
|
+
|
|
|
YarnApplicationState appState = appReport.getYarnApplicationState();
|
|
|
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
|
|
|
|
|
@@ -341,6 +358,19 @@ public class UnmanagedAMLauncher {
|
|
|
private ApplicationReport monitorApplication(ApplicationId appId,
|
|
|
Set<YarnApplicationState> finalState) throws YarnRemoteException {
|
|
|
|
|
|
+ long foundAMCompletedTime = 0;
|
|
|
+ final int timeToWaitMS = 10000;
|
|
|
+ StringBuilder expectedFinalState = new StringBuilder();
|
|
|
+ boolean first = true;
|
|
|
+ for (YarnApplicationState state : finalState) {
|
|
|
+ if (first) {
|
|
|
+ first = false;
|
|
|
+ expectedFinalState.append(state.name());
|
|
|
+ } else {
|
|
|
+ expectedFinalState.append("," + state.name());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
while (true) {
|
|
|
|
|
|
// Check app status every 1 second.
|
|
@@ -370,8 +400,24 @@ public class UnmanagedAMLauncher {
|
|
|
return report;
|
|
|
}
|
|
|
|
|
|
+ // wait for 10 seconds after process has completed for app report to
|
|
|
+ // come back
|
|
|
+ if (amCompleted) {
|
|
|
+ if (foundAMCompletedTime == 0) {
|
|
|
+ foundAMCompletedTime = System.currentTimeMillis();
|
|
|
+ } else if ((System.currentTimeMillis() - foundAMCompletedTime)
|
|
|
+ > timeToWaitMS) {
|
|
|
+ LOG.warn("Waited " + timeToWaitMS/1000
|
|
|
+ + " seconds after process completed for AppReport"
|
|
|
+ + " to reach desired final state. Not waiting anymore."
|
|
|
+ + "CurrentState = " + state
|
|
|
+ + ", ExpectedStates = " + expectedFinalState.toString());
|
|
|
+ throw new RuntimeException("Failed to receive final expected state"
|
|
|
+ + " in ApplicationReport"
|
|
|
+ + ", CurrentState=" + state
|
|
|
+ + ", ExpectedStates=" + expectedFinalState.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
-
|
|
|
}
|