|
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
|
|
|
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
|
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
|
@@ -143,10 +144,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
Map<SubClusterId, Float> newWeightsConverted = new HashMap<>();
|
|
|
boolean allInactive = true;
|
|
|
WeightedPolicyInfo policy = getPolicyInfo();
|
|
|
- if (policy.getAMRMPolicyWeights() == null
|
|
|
- || policy.getAMRMPolicyWeights().size() == 0) {
|
|
|
- allInactive = false;
|
|
|
- } else {
|
|
|
+
|
|
|
+ if (policy.getAMRMPolicyWeights() != null
|
|
|
+ && policy.getAMRMPolicyWeights().size() > 0) {
|
|
|
for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights()
|
|
|
.entrySet()) {
|
|
|
if (e.getValue() > 0) {
|
|
@@ -180,7 +180,6 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
|
|
|
this.federationFacade =
|
|
|
policyContext.getFederationStateStoreFacade();
|
|
|
- this.bookkeeper = new AllocationBookkeeper();
|
|
|
this.homeSubcluster = policyContext.getHomeSubcluster();
|
|
|
|
|
|
}
|
|
@@ -197,7 +196,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
List<ResourceRequest> resourceRequests) throws YarnException {
|
|
|
|
|
|
// object used to accumulate statistics about the answer, initialize with
|
|
|
- // active subclusters.
|
|
|
+ // active subclusters. Create a new instance per call because this method
|
|
|
+ // can be called concurrently.
|
|
|
+ bookkeeper = new AllocationBookkeeper();
|
|
|
bookkeeper.reinitialize(federationFacade.getSubClusters(true));
|
|
|
|
|
|
List<ResourceRequest> nonLocalizedRequests =
|
|
@@ -238,12 +239,16 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
// we log altogether later
|
|
|
}
|
|
|
if (targetIds != null && targetIds.size() > 0) {
|
|
|
+ boolean hasActive = false;
|
|
|
for (SubClusterId tid : targetIds) {
|
|
|
if (bookkeeper.isActiveAndEnabled(tid)) {
|
|
|
bookkeeper.addRackRR(tid, rr);
|
|
|
+ hasActive = true;
|
|
|
}
|
|
|
}
|
|
|
- continue;
|
|
|
+ if (hasActive) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Handle node/rack requests that the SubClusterResolver cannot map to
|
|
@@ -347,7 +352,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
originalResourceRequest.getExecutionTypeRequest());
|
|
|
out.setAllocationRequestId(allocationId);
|
|
|
out.setNumContainers((int) Math.ceil(numContainer));
|
|
|
- if (out.isAnyLocation(out.getResourceName())) {
|
|
|
+ if (ResourceRequest.isAnyLocation(out.getResourceName())) {
|
|
|
allocationBookkeeper.addAnyRR(targetId, out);
|
|
|
} else {
|
|
|
allocationBookkeeper.addRackRR(targetId, out);
|
|
@@ -362,7 +367,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
*/
|
|
|
private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
|
|
|
AllocationBookkeeper allocationBookkeeper) {
|
|
|
- float totWeight = allocationBookkeeper.getTotNumLocalizedContainers();
|
|
|
+ float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(reqId);
|
|
|
float localWeight =
|
|
|
allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
|
|
|
return totWeight > 0 ? localWeight / totWeight : 0;
|
|
@@ -375,7 +380,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
private float getPolicyConfigWeighting(SubClusterId targetId,
|
|
|
AllocationBookkeeper allocationBookkeeper) {
|
|
|
float totWeight = allocationBookkeeper.totPolicyWeight;
|
|
|
- Float localWeight = weights.get(targetId);
|
|
|
+ Float localWeight = allocationBookkeeper.policyWeights.get(targetId);
|
|
|
return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0;
|
|
|
}
|
|
|
|
|
@@ -424,29 +429,36 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
// asks, used to correctly "spread" the corresponding ANY
|
|
|
private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM =
|
|
|
new HashMap<>();
|
|
|
+ private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>();
|
|
|
|
|
|
private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
|
|
|
- private long totNumLocalizedContainers = 0;
|
|
|
private float totHeadroomMemory = 0;
|
|
|
private int totHeadRoomEnabledRMs = 0;
|
|
|
+ private Map<SubClusterId, Float> policyWeights;
|
|
|
private float totPolicyWeight = 0;
|
|
|
|
|
|
private void reinitialize(
|
|
|
Map<SubClusterId, SubClusterInfo> activeSubclusters)
|
|
|
throws YarnException {
|
|
|
+ if (activeSubclusters == null) {
|
|
|
+ throw new YarnRuntimeException("null activeSubclusters received");
|
|
|
+ }
|
|
|
|
|
|
// reset data structures
|
|
|
answer.clear();
|
|
|
countContainersPerRM.clear();
|
|
|
+ totNumLocalizedContainers.clear();
|
|
|
activeAndEnabledSC.clear();
|
|
|
- totNumLocalizedContainers = 0;
|
|
|
totHeadroomMemory = 0;
|
|
|
totHeadRoomEnabledRMs = 0;
|
|
|
+ // save the reference locally in case the weights get reinitialized
|
|
|
+ // concurrently
|
|
|
+ policyWeights = weights;
|
|
|
totPolicyWeight = 0;
|
|
|
|
|
|
// pre-compute the set of subclusters that are both active and enabled by
|
|
|
// the policy weights, and accumulate their total weight
|
|
|
- for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) {
|
|
|
+ for (Map.Entry<SubClusterId, Float> entry : policyWeights.entrySet()) {
|
|
|
if (entry.getValue() > 0
|
|
|
&& activeSubclusters.containsKey(entry.getKey())) {
|
|
|
activeAndEnabledSC.add(entry.getKey());
|
|
@@ -467,7 +479,6 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
totHeadRoomEnabledRMs++;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -475,7 +486,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
* on a per-allocation-id and per-subcluster bases.
|
|
|
*/
|
|
|
private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
|
|
|
- Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
|
|
|
+ Preconditions
|
|
|
+ .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
|
|
|
|
|
|
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
|
|
|
countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
|
|
@@ -488,7 +500,12 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
|
|
|
.addAndGet(rr.getNumContainers());
|
|
|
|
|
|
- totNumLocalizedContainers += rr.getNumContainers();
|
|
|
+ if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) {
|
|
|
+ totNumLocalizedContainers.put(rr.getAllocationRequestId(),
|
|
|
+ new AtomicLong(0));
|
|
|
+ }
|
|
|
+ totNumLocalizedContainers.get(rr.getAllocationRequestId())
|
|
|
+ .addAndGet(rr.getNumContainers());
|
|
|
|
|
|
internalAddToAnswer(targetId, rr);
|
|
|
}
|
|
@@ -497,7 +514,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
* Add a rack-local request to the final asnwer.
|
|
|
*/
|
|
|
public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
|
|
|
- Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
|
|
|
+ Preconditions
|
|
|
+ .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
|
|
|
internalAddToAnswer(targetId, rr);
|
|
|
}
|
|
|
|
|
@@ -505,7 +523,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
* Add an ANY request to the final answer.
|
|
|
*/
|
|
|
private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
|
|
|
- Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName()));
|
|
|
+ Preconditions
|
|
|
+ .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName()));
|
|
|
internalAddToAnswer(targetId, rr);
|
|
|
}
|
|
|
|
|
@@ -552,10 +571,12 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Return the total number of container coming from localized requests.
|
|
|
+ * Return the total number of container coming from localized requests
|
|
|
+ * matching an allocation Id.
|
|
|
*/
|
|
|
- private long getTotNumLocalizedContainers() {
|
|
|
- return totNumLocalizedContainers;
|
|
|
+ private long getTotNumLocalizedContainers(long allocationId) {
|
|
|
+ AtomicLong c = totNumLocalizedContainers.get(allocationId);
|
|
|
+ return c == null ? 0 : c.get();
|
|
|
}
|
|
|
|
|
|
/**
|