|
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
|
|
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
|
@@ -161,7 +162,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
private long storedFinishTime = 0;
|
|
|
private int firstAttemptIdInStateStore = 1;
|
|
|
private int nextAttemptId = 1;
|
|
|
- private String collectorAddr;
|
|
|
+ private AppCollectorData collectorData;
|
|
|
// This field isn't protected by readlock now.
|
|
|
private volatile RMAppAttempt currentAttempt;
|
|
|
private String queue;
|
|
@@ -213,8 +214,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
// Transitions from NEW state
|
|
|
.addTransition(RMAppState.NEW, RMAppState.NEW,
|
|
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
|
|
- .addTransition(RMAppState.NEW, RMAppState.NEW,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
|
|
|
RMAppEventType.START, new RMAppNewlySavingTransition())
|
|
|
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
|
|
@@ -231,8 +230,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
// Transitions from NEW_SAVING state
|
|
|
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
|
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
|
|
- .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
|
|
|
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
|
|
|
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
|
|
@@ -249,8 +246,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
// Transitions from SUBMITTED state
|
|
|
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
|
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
|
|
- .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
|
|
|
RMAppEventType.APP_REJECTED,
|
|
|
new FinalSavingTransition(
|
|
@@ -265,8 +260,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
// Transitions from ACCEPTED state
|
|
|
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
|
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
|
|
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
|
|
|
RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
|
|
|
YarnApplicationState.RUNNING))
|
|
@@ -292,8 +285,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
// Transitions from RUNNING state
|
|
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
|
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
|
|
- .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
|
|
|
RMAppEventType.ATTEMPT_UNREGISTERED,
|
|
|
new FinalSavingTransition(
|
|
@@ -323,8 +314,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
|
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
|
|
new AppRunningOnNodeTransition())
|
|
|
- .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
// ignorable transitions
|
|
|
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
|
|
|
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
|
|
@@ -336,8 +325,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
|
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
|
|
new AppRunningOnNodeTransition())
|
|
|
- .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
// ignorable transitions
|
|
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
|
|
EnumSet.of(RMAppEventType.NODE_UPDATE,
|
|
@@ -349,8 +336,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
|
|
RMAppEventType.APP_RUNNING_ON_NODE,
|
|
|
new AppRunningOnNodeTransition())
|
|
|
- .addTransition(RMAppState.KILLING, RMAppState.KILLING,
|
|
|
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
|
|
|
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
|
|
|
RMAppEventType.ATTEMPT_KILLED,
|
|
|
new FinalSavingTransition(
|
|
@@ -623,18 +608,16 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public String getCollectorAddr() {
|
|
|
- return this.collectorAddr;
|
|
|
+ public AppCollectorData getCollectorData() {
|
|
|
+ return this.collectorData;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void setCollectorAddr(String collectorAddress) {
|
|
|
- this.collectorAddr = collectorAddress;
|
|
|
+ public void setCollectorData(AppCollectorData incomingData) {
|
|
|
+ this.collectorData = incomingData;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void removeCollectorAddr() {
|
|
|
- this.collectorAddr = null;
|
|
|
+ public void removeCollectorData() {
|
|
|
+ this.collectorData = null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1005,24 +988,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- private static final class RMAppCollectorUpdateTransition
|
|
|
- extends RMAppTransition {
|
|
|
-
|
|
|
- public void transition(RMAppImpl app, RMAppEvent event) {
|
|
|
- if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
|
|
|
- LOG.info("Updating collector info for app: " + app.getApplicationId());
|
|
|
-
|
|
|
- RMAppCollectorUpdateEvent appCollectorUpdateEvent =
|
|
|
- (RMAppCollectorUpdateEvent) event;
|
|
|
- // Update collector address
|
|
|
- app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
|
|
|
-
|
|
|
- // TODO persistent to RMStateStore for recover
|
|
|
- // Save to RMStateStore
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
|
|
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
|
|
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
|