|
@@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
|
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.ContainerType;
|
|
@@ -400,10 +401,20 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
new LogAggregationContextPBImpl(p.getLogAggregationContext());
|
|
|
}
|
|
|
|
|
|
+ FlowContext fc = null;
|
|
|
+ if (p.getFlowContext() != null) {
|
|
|
+ FlowContextProto fcp = p.getFlowContext();
|
|
|
+ fc = new FlowContext(fcp.getFlowName(), fcp.getFlowVersion(),
|
|
|
+ fcp.getFlowRunId());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(
|
|
|
+ "Recovering Flow context: " + fc + " for an application " + appId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
LOG.info("Recovering application " + appId);
|
|
|
- //TODO: Recover flow and flow run ID
|
|
|
- ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
|
|
|
- creds, context, p.getAppLogAggregationInitedTime());
|
|
|
+ ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
|
|
|
+ appId, creds, context, p.getAppLogAggregationInitedTime());
|
|
|
context.getApplications().put(appId, app);
|
|
|
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
|
|
|
}
|
|
@@ -980,7 +991,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
|
|
|
String user, Credentials credentials,
|
|
|
Map<ApplicationAccessType, String> appAcls,
|
|
|
- LogAggregationContext logAggregationContext) {
|
|
|
+ LogAggregationContext logAggregationContext, FlowContext flowContext) {
|
|
|
|
|
|
ContainerManagerApplicationProto.Builder builder =
|
|
|
ContainerManagerApplicationProto.newBuilder();
|
|
@@ -1015,6 +1026,16 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ builder.clearFlowContext();
|
|
|
+ if (flowContext != null && flowContext.getFlowName() != null
|
|
|
+ && flowContext.getFlowVersion() != null) {
|
|
|
+ FlowContextProto fcp =
|
|
|
+ FlowContextProto.newBuilder().setFlowName(flowContext.getFlowName())
|
|
|
+ .setFlowVersion(flowContext.getFlowVersion())
|
|
|
+ .setFlowRunId(flowContext.getFlowRunId()).build();
|
|
|
+ builder.setFlowContext(fcp);
|
|
|
+ }
|
|
|
+
|
|
|
return builder.build();
|
|
|
}
|
|
|
|
|
@@ -1067,25 +1088,29 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
this.readLock.lock();
|
|
|
try {
|
|
|
if (!isServiceStopped()) {
|
|
|
- // Create the application
|
|
|
- // populate the flow context from the launch context if the timeline
|
|
|
- // service v.2 is enabled
|
|
|
- FlowContext flowContext = null;
|
|
|
- if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
|
|
- String flowName = launchContext.getEnvironment().get(
|
|
|
- TimelineUtils.FLOW_NAME_TAG_PREFIX);
|
|
|
- String flowVersion = launchContext.getEnvironment().get(
|
|
|
- TimelineUtils.FLOW_VERSION_TAG_PREFIX);
|
|
|
- String flowRunIdStr = launchContext.getEnvironment().get(
|
|
|
- TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
|
|
|
- long flowRunId = 0L;
|
|
|
- if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
|
|
|
- flowRunId = Long.parseLong(flowRunIdStr);
|
|
|
- }
|
|
|
- flowContext =
|
|
|
- new FlowContext(flowName, flowVersion, flowRunId);
|
|
|
- }
|
|
|
if (!context.getApplications().containsKey(applicationID)) {
|
|
|
+ // Create the application
|
|
|
+ // populate the flow context from the launch context if the timeline
|
|
|
+ // service v.2 is enabled
|
|
|
+ FlowContext flowContext = null;
|
|
|
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
|
|
+ String flowName = launchContext.getEnvironment()
|
|
|
+ .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
|
|
|
+ String flowVersion = launchContext.getEnvironment()
|
|
|
+ .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
|
|
|
+ String flowRunIdStr = launchContext.getEnvironment()
|
|
|
+ .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
|
|
|
+ long flowRunId = 0L;
|
|
|
+ if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
|
|
|
+ flowRunId = Long.parseLong(flowRunIdStr);
|
|
|
+ }
|
|
|
+ flowContext = new FlowContext(flowName, flowVersion, flowRunId);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Flow context: " + flowContext
|
|
|
+ + " created for an application " + applicationID);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
Application application =
|
|
|
new ApplicationImpl(dispatcher, user, flowContext,
|
|
|
applicationID, credentials, context);
|
|
@@ -1099,7 +1124,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
container.getLaunchContext().getApplicationACLs();
|
|
|
context.getNMStateStore().storeApplication(applicationID,
|
|
|
buildAppProto(applicationID, user, credentials, appAcls,
|
|
|
- logAggregationContext));
|
|
|
+ logAggregationContext, flowContext));
|
|
|
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
|
|
|
applicationID, appAcls, logAggregationContext));
|
|
|
}
|