|
@@ -116,6 +116,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
NMSS_CLASS_PREFIX + "secondarySC/";
|
|
NMSS_CLASS_PREFIX + "secondarySC/";
|
|
public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
|
|
public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
|
|
|
|
|
|
|
|
+ private ApplicationAttemptId attemptId;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* The home sub-cluster is the sub-cluster where the AM container is running
|
|
* The home sub-cluster is the sub-cluster where the AM container is running
|
|
* in.
|
|
* in.
|
|
@@ -124,20 +126,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
private SubClusterId homeSubClusterId;
|
|
private SubClusterId homeSubClusterId;
|
|
private volatile int lastHomeResponseId;
|
|
private volatile int lastHomeResponseId;
|
|
|
|
|
|
- /**
|
|
|
|
- * A flag for work preserving NM restart. If we just recovered, we need to
|
|
|
|
- * generate an {@link ApplicationMasterNotRegisteredException} exception back
|
|
|
|
- * to AM (similar to what RM will do after its restart/fail-over) in its next
|
|
|
|
- * allocate to trigger AM re-register (which we will shield from RM and just
|
|
|
|
- * return our saved register response) and a full pending requests re-send, so
|
|
|
|
- * that all the {@link AMRMClientRelayer} will be re-populated with all
|
|
|
|
- * pending requests.
|
|
|
|
- *
|
|
|
|
- * TODO: When split-merge is not idempotent, this can lead to some
|
|
|
|
- * over-allocation without a full cancel to RM.
|
|
|
|
- */
|
|
|
|
- private volatile boolean justRecovered;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
|
|
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
|
|
* using subClusterId as uamId. One UAM is created per sub-cluster RM except
|
|
* using subClusterId as uamId. One UAM is created per sub-cluster RM except
|
|
@@ -156,15 +144,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
*/
|
|
*/
|
|
private Map<String, AMRMClientRelayer> secondaryRelayers;
|
|
private Map<String, AMRMClientRelayer> secondaryRelayers;
|
|
|
|
|
|
- /** Thread pool used for asynchronous operations. */
|
|
|
|
- private ExecutorService threadpool;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Stores the AllocateResponses that are received asynchronously from all the
|
|
* Stores the AllocateResponses that are received asynchronously from all the
|
|
* sub-cluster resource managers except the home RM.
|
|
* sub-cluster resource managers except the home RM.
|
|
*/
|
|
*/
|
|
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
|
|
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
|
|
|
|
|
|
|
|
+ /** Thread pool used for asynchronous operations. */
|
|
|
|
+ private ExecutorService threadpool;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A flag for work preserving NM restart. If we just recovered, we need to
|
|
|
|
+ * generate an {@link ApplicationMasterNotRegisteredException} exception back
|
|
|
|
+ * to AM (similar to what RM will do after its restart/fail-over) in its next
|
|
|
|
+ * allocate to trigger AM re-register (which we will shield from RM and just
|
|
|
|
+ * return our saved register response) and a full pending requests re-send, so
|
|
|
|
+ * that all the {@link AMRMClientRelayer} will be re-populated with all
|
|
|
|
+ * pending requests.
|
|
|
|
+ *
|
|
|
|
+ * TODO: When split-merge is not idempotent, this can lead to some
|
|
|
|
+ * over-allocation without a full cancel to RM.
|
|
|
|
+ */
|
|
|
|
+ private volatile boolean justRecovered;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Used to keep track of the container Id and the sub cluster RM that created
|
|
* Used to keep track of the container Id and the sub cluster RM that created
|
|
* the container, so that we know which sub-cluster to forward later requests
|
|
* the container, so that we know which sub-cluster to forward later requests
|
|
@@ -179,7 +181,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
private RegisterApplicationMasterRequest amRegistrationRequest;
|
|
private RegisterApplicationMasterRequest amRegistrationRequest;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * The original registration response from home RM. This instance is reused
|
|
|
|
|
|
+ * The original registration response returned to AM. This instance is reused
|
|
* for duplicate register request from AM, triggered by timeout between AM and
|
|
* for duplicate register request from AM, triggered by timeout between AM and
|
|
* AMRMProxy.
|
|
* AMRMProxy.
|
|
*/
|
|
*/
|
|
@@ -247,12 +249,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ this.attemptId = appContext.getApplicationAttemptId();
|
|
|
|
+ ApplicationId appId = this.attemptId.getApplicationId();
|
|
this.homeSubClusterId =
|
|
this.homeSubClusterId =
|
|
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
|
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
|
- this.homeRMRelayer = new AMRMClientRelayer(
|
|
|
|
- createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
|
|
|
|
- this.appOwner),
|
|
|
|
- getApplicationContext().getApplicationAttemptId().getApplicationId());
|
|
|
|
|
|
+ this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
|
|
|
|
+ ApplicationMasterProtocol.class, this.appOwner), appId);
|
|
|
|
|
|
this.federationFacade = FederationStateStoreFacade.getInstance();
|
|
this.federationFacade = FederationStateStoreFacade.getInstance();
|
|
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
|
|
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
|
|
@@ -267,9 +269,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
@Override
|
|
@Override
|
|
public void recover(Map<String, byte[]> recoveredDataMap) {
|
|
public void recover(Map<String, byte[]> recoveredDataMap) {
|
|
super.recover(recoveredDataMap);
|
|
super.recover(recoveredDataMap);
|
|
- ApplicationAttemptId attemptId =
|
|
|
|
- getApplicationContext().getApplicationAttemptId();
|
|
|
|
- LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
|
|
|
|
|
|
+ LOG.info("Recovering data for FederationInterceptor for {}",
|
|
|
|
+ this.attemptId);
|
|
if (recoveredDataMap == null) {
|
|
if (recoveredDataMap == null) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@@ -280,7 +281,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
.parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
|
|
.parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
|
|
this.amRegistrationRequest =
|
|
this.amRegistrationRequest =
|
|
new RegisterApplicationMasterRequestPBImpl(pb);
|
|
new RegisterApplicationMasterRequestPBImpl(pb);
|
|
- LOG.info("amRegistrationRequest recovered for {}", attemptId);
|
|
|
|
|
|
+ LOG.info("amRegistrationRequest recovered for {}", this.attemptId);
|
|
|
|
|
|
// Give the register request to homeRMRelayer for future re-registration
|
|
// Give the register request to homeRMRelayer for future re-registration
|
|
this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
|
|
this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
|
|
@@ -291,7 +292,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
.parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
|
|
.parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
|
|
this.amRegistrationResponse =
|
|
this.amRegistrationResponse =
|
|
new RegisterApplicationMasterResponsePBImpl(pb);
|
|
new RegisterApplicationMasterResponsePBImpl(pb);
|
|
- LOG.info("amRegistrationResponse recovered for {}", attemptId);
|
|
|
|
|
|
+ LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
|
|
// Trigger re-register and full pending re-send only if we have a
|
|
// Trigger re-register and full pending re-send only if we have a
|
|
// saved register response. This should always be true though.
|
|
// saved register response. This should always be true though.
|
|
this.justRecovered = true;
|
|
this.justRecovered = true;
|
|
@@ -301,9 +302,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
Map<String, Token<AMRMTokenIdentifier>> uamMap;
|
|
Map<String, Token<AMRMTokenIdentifier>> uamMap;
|
|
if (this.registryClient != null) {
|
|
if (this.registryClient != null) {
|
|
uamMap = this.registryClient
|
|
uamMap = this.registryClient
|
|
- .loadStateFromRegistry(attemptId.getApplicationId());
|
|
|
|
|
|
+ .loadStateFromRegistry(this.attemptId.getApplicationId());
|
|
LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
|
|
LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
|
|
- uamMap.size(), attemptId.getApplicationId());
|
|
|
|
|
|
+ uamMap.size(), this.attemptId.getApplicationId());
|
|
} else {
|
|
} else {
|
|
uamMap = new HashMap<>();
|
|
uamMap = new HashMap<>();
|
|
for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
|
|
for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
|
|
@@ -319,7 +320,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
LOG.info("Found {} existing UAMs for application {} in NMStateStore",
|
|
LOG.info("Found {} existing UAMs for application {} in NMStateStore",
|
|
- uamMap.size(), attemptId.getApplicationId());
|
|
|
|
|
|
+ uamMap.size(), this.attemptId.getApplicationId());
|
|
}
|
|
}
|
|
|
|
|
|
// Re-attach the UAMs
|
|
// Re-attach the UAMs
|
|
@@ -336,7 +337,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
|
|
|
|
try {
|
|
try {
|
|
this.uamPool.reAttachUAM(subClusterId.getId(), config,
|
|
this.uamPool.reAttachUAM(subClusterId.getId(), config,
|
|
- attemptId.getApplicationId(),
|
|
|
|
|
|
+ this.attemptId.getApplicationId(),
|
|
this.amRegistrationResponse.getQueue(),
|
|
this.amRegistrationResponse.getQueue(),
|
|
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
|
|
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
|
|
entry.getValue());
|
|
entry.getValue());
|
|
@@ -359,9 +360,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
subClusterId);
|
|
subClusterId);
|
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- LOG.error(
|
|
|
|
- "Error reattaching UAM to " + subClusterId + " for " + attemptId,
|
|
|
|
- e);
|
|
|
|
|
|
+ LOG.error("Error reattaching UAM to " + subClusterId + " for "
|
|
|
|
+ + this.attemptId, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -374,8 +374,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
createHomeRMProxy(getApplicationContext(),
|
|
createHomeRMProxy(getApplicationContext(),
|
|
ApplicationClientProtocol.class, appSubmitter);
|
|
ApplicationClientProtocol.class, appSubmitter);
|
|
|
|
|
|
- GetContainersResponse response =
|
|
|
|
- rmClient.getContainers(GetContainersRequest.newInstance(attemptId));
|
|
|
|
|
|
+ GetContainersResponse response = rmClient
|
|
|
|
+ .getContainers(GetContainersRequest.newInstance(this.attemptId));
|
|
for (ContainerReport container : response.getContainerList()) {
|
|
for (ContainerReport container : response.getContainerList()) {
|
|
containerIdToSubClusterIdMap.put(container.getContainerId(),
|
|
containerIdToSubClusterIdMap.put(container.getContainerId(),
|
|
this.homeSubClusterId);
|
|
this.homeSubClusterId);
|
|
@@ -388,7 +388,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
|
|
|
|
LOG.info(
|
|
LOG.info(
|
|
"In all {} UAMs {} running containers including AM recovered for {}",
|
|
"In all {} UAMs {} running containers including AM recovered for {}",
|
|
- uamMap.size(), containers, attemptId);
|
|
|
|
|
|
+ uamMap.size(), containers, this.attemptId);
|
|
|
|
|
|
if (this.amRegistrationResponse != null) {
|
|
if (this.amRegistrationResponse != null) {
|
|
// Initialize the AMRMProxyPolicy
|
|
// Initialize the AMRMProxyPolicy
|
|
@@ -439,12 +439,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
RegisterApplicationMasterRequestPBImpl pb =
|
|
RegisterApplicationMasterRequestPBImpl pb =
|
|
(RegisterApplicationMasterRequestPBImpl)
|
|
(RegisterApplicationMasterRequestPBImpl)
|
|
this.amRegistrationRequest;
|
|
this.amRegistrationRequest;
|
|
- getNMStateStore().storeAMRMProxyAppContextEntry(
|
|
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
|
|
+ getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
|
|
NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
|
|
NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.error("Error storing AMRMProxy application context entry for "
|
|
LOG.error("Error storing AMRMProxy application context entry for "
|
|
- + getApplicationContext().getApplicationAttemptId(), e);
|
|
|
|
|
|
+ + this.attemptId, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -479,8 +478,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
this.homeSubClusterId);
|
|
this.homeSubClusterId);
|
|
}
|
|
}
|
|
|
|
|
|
- ApplicationId appId =
|
|
|
|
- getApplicationContext().getApplicationAttemptId().getApplicationId();
|
|
|
|
|
|
+ ApplicationId appId = this.attemptId.getApplicationId();
|
|
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
|
|
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
|
|
|
|
|
|
if (getNMStateStore() != null) {
|
|
if (getNMStateStore() != null) {
|
|
@@ -488,12 +486,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
RegisterApplicationMasterResponsePBImpl pb =
|
|
RegisterApplicationMasterResponsePBImpl pb =
|
|
(RegisterApplicationMasterResponsePBImpl)
|
|
(RegisterApplicationMasterResponsePBImpl)
|
|
this.amRegistrationResponse;
|
|
this.amRegistrationResponse;
|
|
- getNMStateStore().storeAMRMProxyAppContextEntry(
|
|
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
|
|
+ getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
|
|
NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
|
|
NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.error("Error storing AMRMProxy application context entry for "
|
|
LOG.error("Error storing AMRMProxy application context entry for "
|
|
- + getApplicationContext().getApplicationAttemptId(), e);
|
|
|
|
|
|
+ + this.attemptId, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -535,8 +532,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
this.lastHomeResponseId = request.getResponseId();
|
|
this.lastHomeResponseId = request.getResponseId();
|
|
|
|
|
|
throw new ApplicationMasterNotRegisteredException(
|
|
throw new ApplicationMasterNotRegisteredException(
|
|
- "AMRMProxy just restarted and recovered for "
|
|
|
|
- + getApplicationContext().getApplicationAttemptId()
|
|
|
|
|
|
+ "AMRMProxy just restarted and recovered for " + this.attemptId
|
|
+ ". AM should re-register and full re-send pending requests.");
|
|
+ ". AM should re-register and full re-send pending requests.");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -553,8 +549,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
if (this.justRecovered
|
|
if (this.justRecovered
|
|
|| request.getResponseId() > this.lastHomeResponseId) {
|
|
|| request.getResponseId() > this.lastHomeResponseId) {
|
|
LOG.warn("Setting allocate responseId for {} from {} to {}",
|
|
LOG.warn("Setting allocate responseId for {} from {} to {}",
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
- request.getResponseId(), this.lastHomeResponseId);
|
|
|
|
|
|
+ this.attemptId, request.getResponseId(), this.lastHomeResponseId);
|
|
request.setResponseId(this.lastHomeResponseId);
|
|
request.setResponseId(this.lastHomeResponseId);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -573,8 +568,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
|
|
|
|
// Send the request to the home RM and get the response
|
|
// Send the request to the home RM and get the response
|
|
AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
|
|
AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
|
|
- LOG.info("{} heartbeating to home RM with responseId {}",
|
|
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
|
|
+ LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId,
|
|
homeRequest.getResponseId());
|
|
homeRequest.getResponseId());
|
|
|
|
|
|
AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
|
|
AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
|
|
@@ -612,8 +606,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
}
|
|
}
|
|
|
|
|
|
LOG.info("{} heartbeat response from home RM with responseId {}",
|
|
LOG.info("{} heartbeat response from home RM with responseId {}",
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
- homeResponse.getResponseId());
|
|
|
|
|
|
+ this.attemptId, homeResponse.getResponseId());
|
|
|
|
|
|
// Update lastHomeResponseId in three cases:
|
|
// Update lastHomeResponseId in three cases:
|
|
// 1. The normal responseId increments
|
|
// 1. The normal responseId increments
|
|
@@ -676,15 +669,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
secondaryRelayers.remove(subClusterId);
|
|
secondaryRelayers.remove(subClusterId);
|
|
|
|
|
|
if (getNMStateStore() != null) {
|
|
if (getNMStateStore() != null) {
|
|
- getNMStateStore().removeAMRMProxyAppContextEntry(
|
|
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
|
|
+ getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
|
|
NMSS_SECONDARY_SC_PREFIX + subClusterId);
|
|
NMSS_SECONDARY_SC_PREFIX + subClusterId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
LOG.warn("Failed to finish unmanaged application master: "
|
|
LOG.warn("Failed to finish unmanaged application master: "
|
|
+ "RM address: " + subClusterId + " ApplicationId: "
|
|
+ "RM address: " + subClusterId + " ApplicationId: "
|
|
- + getApplicationContext().getApplicationAttemptId(), e);
|
|
|
|
|
|
+ + attemptId, e);
|
|
}
|
|
}
|
|
return new FinishApplicationMasterResponseInfo(uamResponse,
|
|
return new FinishApplicationMasterResponseInfo(uamResponse,
|
|
subClusterId);
|
|
subClusterId);
|
|
@@ -720,8 +712,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
failedToUnRegister = true;
|
|
failedToUnRegister = true;
|
|
LOG.warn("Failed to finish unmanaged application master: "
|
|
LOG.warn("Failed to finish unmanaged application master: "
|
|
- + " ApplicationId: "
|
|
|
|
- + getApplicationContext().getApplicationAttemptId(), e);
|
|
|
|
|
|
+ + " ApplicationId: " + this.attemptId, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -733,8 +724,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
// attempt will be launched.
|
|
// attempt will be launched.
|
|
this.uamPool.stop();
|
|
this.uamPool.stop();
|
|
if (this.registryClient != null) {
|
|
if (this.registryClient != null) {
|
|
- this.registryClient.removeAppFromRegistry(getApplicationContext()
|
|
|
|
- .getApplicationAttemptId().getApplicationId());
|
|
|
|
|
|
+ this.registryClient
|
|
|
|
+ .removeAppFromRegistry(this.attemptId.getApplicationId());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return homeResponse;
|
|
return homeResponse;
|
|
@@ -755,12 +746,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
public void shutdown() {
|
|
public void shutdown() {
|
|
// Do not stop uamPool service and kill UAMs here because of possible second
|
|
// Do not stop uamPool service and kill UAMs here because of possible second
|
|
// app attempt
|
|
// app attempt
|
|
- if (threadpool != null) {
|
|
|
|
|
|
+ if (this.threadpool != null) {
|
|
try {
|
|
try {
|
|
- threadpool.shutdown();
|
|
|
|
|
|
+ this.threadpool.shutdown();
|
|
} catch (Throwable ex) {
|
|
} catch (Throwable ex) {
|
|
}
|
|
}
|
|
- threadpool = null;
|
|
|
|
|
|
+ this.threadpool = null;
|
|
}
|
|
}
|
|
super.shutdown();
|
|
super.shutdown();
|
|
}
|
|
}
|
|
@@ -1090,59 +1081,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
}
|
|
}
|
|
|
|
|
|
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
|
|
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
|
|
- new AsyncCallback<AllocateResponse>() {
|
|
|
|
- @Override
|
|
|
|
- public void callback(AllocateResponse response) {
|
|
|
|
- synchronized (asyncResponseSink) {
|
|
|
|
- List<AllocateResponse> responses = null;
|
|
|
|
- if (asyncResponseSink.containsKey(subClusterId)) {
|
|
|
|
- responses = asyncResponseSink.get(subClusterId);
|
|
|
|
- } else {
|
|
|
|
- responses = new ArrayList<>();
|
|
|
|
- asyncResponseSink.put(subClusterId, responses);
|
|
|
|
- }
|
|
|
|
- responses.add(response);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Save the new AMRMToken for the UAM if present
|
|
|
|
- if (response.getAMRMToken() != null) {
|
|
|
|
- Token<AMRMTokenIdentifier> newToken = ConverterUtils
|
|
|
|
- .convertFromYarn(response.getAMRMToken(), (Text) null);
|
|
|
|
- // Update the token in registry or NMSS
|
|
|
|
- if (registryClient != null) {
|
|
|
|
- registryClient
|
|
|
|
- .writeAMRMTokenForUAM(
|
|
|
|
- getApplicationContext().getApplicationAttemptId()
|
|
|
|
- .getApplicationId(),
|
|
|
|
- subClusterId.getId(), newToken);
|
|
|
|
- } else if (getNMStateStore() != null) {
|
|
|
|
- try {
|
|
|
|
- getNMStateStore().storeAMRMProxyAppContextEntry(
|
|
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
- NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
|
|
|
|
- newToken.encodeToUrlString()
|
|
|
|
- .getBytes(STRING_TO_BYTE_FORMAT));
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.error(
|
|
|
|
- "Error storing UAM token as AMRMProxy "
|
|
|
|
- + "context entry in NMSS for "
|
|
|
|
- + getApplicationContext().getApplicationAttemptId(),
|
|
|
|
- e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Notify policy of secondary sub-cluster responses
|
|
|
|
- try {
|
|
|
|
- policyInterpreter.notifyOfResponse(subClusterId, response);
|
|
|
|
- } catch (YarnException e) {
|
|
|
|
- LOG.warn(
|
|
|
|
- "notifyOfResponse for policy failed for home sub-cluster "
|
|
|
|
- + subClusterId,
|
|
|
|
- e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ new HeartbeatCallBack(subClusterId));
|
|
}
|
|
}
|
|
|
|
|
|
return registrations;
|
|
return registrations;
|
|
@@ -1195,7 +1134,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
try {
|
|
try {
|
|
// For appNameSuffix, use subClusterId of the home sub-cluster
|
|
// For appNameSuffix, use subClusterId of the home sub-cluster
|
|
token = uamPool.launchUAM(subClusterId, config,
|
|
token = uamPool.launchUAM(subClusterId, config,
|
|
- appContext.getApplicationAttemptId().getApplicationId(),
|
|
|
|
|
|
+ attemptId.getApplicationId(),
|
|
amRegistrationResponse.getQueue(), appContext.getUser(),
|
|
amRegistrationResponse.getQueue(), appContext.getUser(),
|
|
homeSubClusterId.toString(), true);
|
|
homeSubClusterId.toString(), true);
|
|
|
|
|
|
@@ -1206,8 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
registerRequest);
|
|
registerRequest);
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
LOG.error("Failed to register application master: "
|
|
LOG.error("Failed to register application master: "
|
|
- + subClusterId + " Application: "
|
|
|
|
- + appContext.getApplicationAttemptId(), e);
|
|
|
|
|
|
+ + subClusterId + " Application: " + attemptId, e);
|
|
}
|
|
}
|
|
return new RegisterApplicationMasterResponseInfo(uamResponse,
|
|
return new RegisterApplicationMasterResponseInfo(uamResponse,
|
|
SubClusterId.newInstance(subClusterId), token);
|
|
SubClusterId.newInstance(subClusterId), token);
|
|
@@ -1232,20 +1170,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
} else {
|
|
} else {
|
|
LOG.info("Successfully registered unmanaged application master: "
|
|
LOG.info("Successfully registered unmanaged application master: "
|
|
+ uamResponse.getSubClusterId() + " ApplicationId: "
|
|
+ uamResponse.getSubClusterId() + " ApplicationId: "
|
|
- + getApplicationContext().getApplicationAttemptId());
|
|
|
|
|
|
+ + this.attemptId);
|
|
successfulRegistrations.put(uamResponse.getSubClusterId(),
|
|
successfulRegistrations.put(uamResponse.getSubClusterId(),
|
|
uamResponse.getResponse());
|
|
uamResponse.getResponse());
|
|
|
|
|
|
// Save the UAM token in registry or NMSS
|
|
// Save the UAM token in registry or NMSS
|
|
if (registryClient != null) {
|
|
if (registryClient != null) {
|
|
registryClient.writeAMRMTokenForUAM(
|
|
registryClient.writeAMRMTokenForUAM(
|
|
- getApplicationContext().getApplicationAttemptId()
|
|
|
|
- .getApplicationId(),
|
|
|
|
|
|
+ this.attemptId.getApplicationId(),
|
|
uamResponse.getSubClusterId().getId(),
|
|
uamResponse.getSubClusterId().getId(),
|
|
uamResponse.getUamToken());
|
|
uamResponse.getUamToken());
|
|
} else if (getNMStateStore() != null) {
|
|
} else if (getNMStateStore() != null) {
|
|
- getNMStateStore().storeAMRMProxyAppContextEntry(
|
|
|
|
- getApplicationContext().getApplicationAttemptId(),
|
|
|
|
|
|
+ getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
|
|
NMSS_SECONDARY_SC_PREFIX
|
|
NMSS_SECONDARY_SC_PREFIX
|
|
+ uamResponse.getSubClusterId().getId(),
|
|
+ uamResponse.getSubClusterId().getId(),
|
|
uamResponse.getUamToken().encodeToUrlString()
|
|
uamResponse.getUamToken().encodeToUrlString()
|
|
@@ -1254,8 +1190,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.warn("Failed to register unmanaged application master: "
|
|
LOG.warn("Failed to register unmanaged application master: "
|
|
- + " ApplicationId: "
|
|
|
|
- + getApplicationContext().getApplicationAttemptId(), e);
|
|
|
|
|
|
+ + " ApplicationId: " + this.attemptId, e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1490,9 +1425,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
"Duplicate containerID found in the allocated containers. This"
|
|
"Duplicate containerID found in the allocated containers. This"
|
|
+ " can happen if the RM epoch is not configured properly."
|
|
+ " can happen if the RM epoch is not configured properly."
|
|
+ " ContainerId: " + container.getId().toString()
|
|
+ " ContainerId: " + container.getId().toString()
|
|
- + " ApplicationId: "
|
|
|
|
- + getApplicationContext().getApplicationAttemptId()
|
|
|
|
- + " From RM: " + subClusterId
|
|
|
|
|
|
+ + " ApplicationId: " + this.attemptId + " From RM: "
|
|
|
|
+ + subClusterId
|
|
+ " . Previous container was from sub-cluster: "
|
|
+ " . Previous container was from sub-cluster: "
|
|
+ existingSubClusterId);
|
|
+ existingSubClusterId);
|
|
}
|
|
}
|
|
@@ -1587,6 +1521,59 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|
return this.asyncResponseSink;
|
|
return this.asyncResponseSink;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Async callback handler for heart beat response from all sub-clusters.
|
|
|
|
+ */
|
|
|
|
+ private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
|
|
|
|
+ private SubClusterId subClusterId;
|
|
|
|
+
|
|
|
|
+ HeartbeatCallBack(SubClusterId subClusterId) {
|
|
|
|
+ this.subClusterId = subClusterId;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void callback(AllocateResponse response) {
|
|
|
|
+ synchronized (asyncResponseSink) {
|
|
|
|
+ List<AllocateResponse> responses = null;
|
|
|
|
+ if (asyncResponseSink.containsKey(subClusterId)) {
|
|
|
|
+ responses = asyncResponseSink.get(subClusterId);
|
|
|
|
+ } else {
|
|
|
|
+ responses = new ArrayList<>();
|
|
|
|
+ asyncResponseSink.put(subClusterId, responses);
|
|
|
|
+ }
|
|
|
|
+ responses.add(response);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Save the new AMRMToken for the UAM if present
|
|
|
|
+ if (response.getAMRMToken() != null) {
|
|
|
|
+ Token<AMRMTokenIdentifier> newToken = ConverterUtils
|
|
|
|
+ .convertFromYarn(response.getAMRMToken(), (Text) null);
|
|
|
|
+ // Update the token in registry or NMSS
|
|
|
|
+ if (registryClient != null) {
|
|
|
|
+ registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
|
|
|
|
+ subClusterId.getId(), newToken);
|
|
|
|
+ } else if (getNMStateStore() != null) {
|
|
|
|
+ try {
|
|
|
|
+ getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
|
|
|
|
+ NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
|
|
|
|
+ newToken.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Error storing UAM token as AMRMProxy "
|
|
|
|
+ + "context entry in NMSS for " + attemptId, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Notify policy of secondary sub-cluster responses
|
|
|
|
+ try {
|
|
|
|
+ policyInterpreter.notifyOfResponse(subClusterId, response);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
|
|
|
|
+ + subClusterId, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Private structure for encapsulating SubClusterId and
|
|
* Private structure for encapsulating SubClusterId and
|
|
* RegisterApplicationMasterResponse instances.
|
|
* RegisterApplicationMasterResponse instances.
|