|
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.apache.log4j.Logger;
|
|
|
|
|
@@ -133,8 +135,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|
|
* register with RM
|
|
|
*/
|
|
|
@Override
|
|
|
- public void firstStep()
|
|
|
- throws YarnException, IOException, InterruptedException {
|
|
|
+ public void firstStep() throws Exception {
|
|
|
simulateStartTimeMS = System.currentTimeMillis() -
|
|
|
SLSRunner.getRunner().getStartTimeMS();
|
|
|
|
|
@@ -149,8 +150,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void middleStep()
|
|
|
- throws InterruptedException, YarnException, IOException {
|
|
|
+ public void middleStep() throws Exception {
|
|
|
// process responses in the queue
|
|
|
processResponseQueue();
|
|
|
|
|
@@ -162,7 +162,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void lastStep() {
|
|
|
+ public void lastStep() throws Exception {
|
|
|
LOG.info(MessageFormat.format("Application {0} is shutting down.", appId));
|
|
|
// unregister tracking
|
|
|
if (isTracked) {
|
|
@@ -173,26 +173,19 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|
|
.newRecordInstance(FinishApplicationMasterRequest.class);
|
|
|
finishAMRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
|
|
|
|
|
|
- try {
|
|
|
- UserGroupInformation ugi =
|
|
|
- UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
|
|
- Token<AMRMTokenIdentifier> token =
|
|
|
- rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
|
|
|
- .getRMAppAttempt(appAttemptId).getAMRMToken();
|
|
|
- ugi.addTokenIdentifier(token.decodeIdentifier());
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
- @Override
|
|
|
- public Object run() throws Exception {
|
|
|
- rm.getApplicationMasterService()
|
|
|
- .finishApplicationMaster(finishAMRequest);
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
+ UserGroupInformation ugi =
|
|
|
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
|
|
+ Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps().get(appId)
|
|
|
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
|
|
|
+ ugi.addTokenIdentifier(token.decodeIdentifier());
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object run() throws Exception {
|
|
|
+ rm.getApplicationMasterService()
|
|
|
+ .finishApplicationMaster(finishAMRequest);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
simulateFinishTimeMS = System.currentTimeMillis() -
|
|
|
SLSRunner.getRunner().getStartTimeMS();
|
|
@@ -230,11 +223,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|
|
return createAllocateRequest(ask, new ArrayList<ContainerId>());
|
|
|
}
|
|
|
|
|
|
- protected abstract void processResponseQueue()
|
|
|
- throws InterruptedException, YarnException, IOException;
|
|
|
+ protected abstract void processResponseQueue() throws Exception;
|
|
|
|
|
|
- protected abstract void sendContainerRequest()
|
|
|
- throws YarnException, IOException, InterruptedException;
|
|
|
+ protected abstract void sendContainerRequest() throws Exception;
|
|
|
|
|
|
protected abstract void checkStop();
|
|
|
|
|
@@ -280,11 +271,18 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|
|
// waiting until application ACCEPTED
|
|
|
RMApp app = rm.getRMContext().getRMApps().get(appId);
|
|
|
while(app.getState() != RMAppState.ACCEPTED) {
|
|
|
- Thread.sleep(50);
|
|
|
+ Thread.sleep(10);
|
|
|
}
|
|
|
|
|
|
- appAttemptId = rm.getRMContext().getRMApps().get(appId)
|
|
|
- .getCurrentAppAttempt().getAppAttemptId();
|
|
|
+ // Waiting until application attempt reach LAUNCHED
|
|
|
+ // "Unmanaged AM must register after AM attempt reaches LAUNCHED state"
|
|
|
+ this.appAttemptId = rm.getRMContext().getRMApps().get(appId)
|
|
|
+ .getCurrentAppAttempt().getAppAttemptId();
|
|
|
+ RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId)
|
|
|
+ .getCurrentAppAttempt();
|
|
|
+ while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) {
|
|
|
+ Thread.sleep(10);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void registerAM()
|
|
@@ -297,10 +295,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
|
|
|
amRegisterRequest.setTrackingUrl("localhost:1000");
|
|
|
|
|
|
UserGroupInformation ugi =
|
|
|
- UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
|
|
- Token<AMRMTokenIdentifier> token =
|
|
|
- rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
|
|
|
- .getRMAppAttempt(appAttemptId).getAMRMToken();
|
|
|
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
|
|
+ Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps().get(appId)
|
|
|
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
|
|
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
|
|
|
|
|
ugi.doAs(
|