|
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
@@ -105,56 +106,56 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
protected final Set<String> blacklistedNodes = new HashSet<String>();
|
|
|
protected final Set<String> blacklistAdditions = new HashSet<String>();
|
|
|
protected final Set<String> blacklistRemovals = new HashSet<String>();
|
|
|
+
|
|
|
+ protected Map<String, Resource> resourceProfilesMap;
|
|
|
|
|
|
static class ResourceRequestInfo<T> {
|
|
|
ResourceRequest remoteRequest;
|
|
|
LinkedHashSet<T> containerRequests;
|
|
|
-
|
|
|
+
|
|
|
ResourceRequestInfo(Long allocationRequestId, Priority priority,
|
|
|
- String resourceName, Resource capability, boolean relaxLocality) {
|
|
|
+ String resourceName, Resource capability, boolean relaxLocality,
|
|
|
+ String resourceProfile) {
|
|
|
+ ProfileCapability profileCapability = ProfileCapability
|
|
|
+ .newInstance(resourceProfile, capability);
|
|
|
remoteRequest = ResourceRequest.newBuilder().priority(priority)
|
|
|
.resourceName(resourceName).capability(capability).numContainers(0)
|
|
|
- .allocationRequestId(allocationRequestId)
|
|
|
- .relaxLocality(relaxLocality).build();
|
|
|
+ .allocationRequestId(allocationRequestId).relaxLocality(relaxLocality)
|
|
|
+ .profileCapability(profileCapability).build();
|
|
|
containerRequests = new LinkedHashSet<T>();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Class compares Resource by memory then cpu in reverse order
|
|
|
+ * Class compares Resource by memory, then cpu and then the remaining resource
|
|
|
+ * types in reverse order.
|
|
|
*/
|
|
|
- static class ResourceReverseMemoryThenCpuComparator implements
|
|
|
- Comparator<Resource>, Serializable {
|
|
|
- static final long serialVersionUID = 12345L;
|
|
|
- @Override
|
|
|
- public int compare(Resource arg0, Resource arg1) {
|
|
|
- long mem0 = arg0.getMemorySize();
|
|
|
- long mem1 = arg1.getMemorySize();
|
|
|
- long cpu0 = arg0.getVirtualCores();
|
|
|
- long cpu1 = arg1.getVirtualCores();
|
|
|
- if(mem0 == mem1) {
|
|
|
- if(cpu0 == cpu1) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- if(cpu0 < cpu1) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
- return -1;
|
|
|
- }
|
|
|
- if(mem0 < mem1) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
- return -1;
|
|
|
- }
|
|
|
+ static class ProfileCapabilityComparator<T extends ProfileCapability>
|
|
|
+ implements Comparator<T> {
|
|
|
+
|
|
|
+ HashMap<String, Resource> resourceProfilesMap;
|
|
|
+
|
|
|
+ public ProfileCapabilityComparator(
|
|
|
+ HashMap<String, Resource> resourceProfileMap) {
|
|
|
+ this.resourceProfilesMap = resourceProfileMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int compare(T arg0, T arg1) {
|
|
|
+ Resource resource0 =
|
|
|
+ ProfileCapability.toResource(arg0, resourceProfilesMap);
|
|
|
+ Resource resource1 =
|
|
|
+ ProfileCapability.toResource(arg1, resourceProfilesMap);
|
|
|
+ return resource1.compareTo(resource0);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- static boolean canFit(Resource arg0, Resource arg1) {
|
|
|
- long mem0 = arg0.getMemorySize();
|
|
|
- long mem1 = arg1.getMemorySize();
|
|
|
- long cpu0 = arg0.getVirtualCores();
|
|
|
- long cpu1 = arg1.getVirtualCores();
|
|
|
-
|
|
|
- return (mem0 <= mem1 && cpu0 <= cpu1);
|
|
|
+ boolean canFit(ProfileCapability arg0, ProfileCapability arg1) {
|
|
|
+ Resource resource0 =
|
|
|
+ ProfileCapability.toResource(arg0, resourceProfilesMap);
|
|
|
+ Resource resource1 =
|
|
|
+ ProfileCapability.toResource(arg1, resourceProfilesMap);
|
|
|
+ return Resources.fitsIn(resource0, resource1);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
|
|
@@ -232,6 +233,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
return registerApplicationMaster();
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
private RegisterApplicationMasterResponse registerApplicationMaster()
|
|
|
throws YarnException, IOException {
|
|
|
RegisterApplicationMasterRequest request =
|
|
@@ -244,6 +246,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
|
|
|
populateNMTokens(response.getNMTokensFromPreviousAttempts());
|
|
|
}
|
|
|
+ this.resourceProfilesMap = response.getResourceProfiles();
|
|
|
}
|
|
|
return response;
|
|
|
}
|
|
@@ -416,13 +419,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
for(ResourceRequest r : ask) {
|
|
|
// create a copy of ResourceRequest as we might change it while the
|
|
|
// RPC layer is using it to send info across
|
|
|
- ResourceRequest rr = ResourceRequest.newBuilder()
|
|
|
- .priority(r.getPriority()).resourceName(r.getResourceName())
|
|
|
- .capability(r.getCapability()).numContainers(r.getNumContainers())
|
|
|
- .relaxLocality(r.getRelaxLocality())
|
|
|
- .nodeLabelExpression(r.getNodeLabelExpression())
|
|
|
- .executionTypeRequest(r.getExecutionTypeRequest())
|
|
|
- .allocationRequestId(r.getAllocationRequestId()).build();
|
|
|
+ ResourceRequest rr =
|
|
|
+ ResourceRequest.newBuilder().priority(r.getPriority())
|
|
|
+ .resourceName(r.getResourceName()).capability(r.getCapability())
|
|
|
+ .numContainers(r.getNumContainers())
|
|
|
+ .relaxLocality(r.getRelaxLocality())
|
|
|
+ .nodeLabelExpression(r.getNodeLabelExpression())
|
|
|
+ .executionTypeRequest(r.getExecutionTypeRequest())
|
|
|
+ .allocationRequestId(r.getAllocationRequestId())
|
|
|
+ .profileCapability(r.getProfileCapability()).build();
|
|
|
askList.add(rr);
|
|
|
}
|
|
|
return askList;
|
|
@@ -504,6 +509,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
public synchronized void addContainerRequest(T req) {
|
|
|
Preconditions.checkArgument(req != null,
|
|
|
"Resource request can not be null.");
|
|
|
+ ProfileCapability profileCapability = ProfileCapability
|
|
|
+ .newInstance(req.getResourceProfile(), req.getCapability());
|
|
|
Set<String> dedupedRacks = new HashSet<String>();
|
|
|
if (req.getRacks() != null) {
|
|
|
dedupedRacks.addAll(req.getRacks());
|
|
@@ -516,6 +523,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
Set<String> inferredRacks = resolveRacks(req.getNodes());
|
|
|
inferredRacks.removeAll(dedupedRacks);
|
|
|
|
|
|
+ checkResourceProfile(req.getResourceProfile());
|
|
|
+
|
|
|
// check that specific and non-specific requests cannot be mixed within a
|
|
|
// priority
|
|
|
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
|
|
@@ -540,26 +549,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
}
|
|
|
for (String node : dedupedNodes) {
|
|
|
addResourceRequest(req.getPriority(), node,
|
|
|
- req.getExecutionTypeRequest(), req.getCapability(), req, true,
|
|
|
+ req.getExecutionTypeRequest(), profileCapability, req, true,
|
|
|
req.getNodeLabelExpression());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for (String rack : dedupedRacks) {
|
|
|
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
|
|
|
- req.getCapability(), req, true, req.getNodeLabelExpression());
|
|
|
+ profileCapability, req, true, req.getNodeLabelExpression());
|
|
|
}
|
|
|
|
|
|
// Ensure node requests are accompanied by requests for
|
|
|
// corresponding rack
|
|
|
for (String rack : inferredRacks) {
|
|
|
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
|
|
|
- req.getCapability(), req, req.getRelaxLocality(),
|
|
|
+ profileCapability, req, req.getRelaxLocality(),
|
|
|
req.getNodeLabelExpression());
|
|
|
}
|
|
|
// Off-switch
|
|
|
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
|
|
|
- req.getExecutionTypeRequest(), req.getCapability(), req,
|
|
|
+ req.getExecutionTypeRequest(), profileCapability, req,
|
|
|
req.getRelaxLocality(), req.getNodeLabelExpression());
|
|
|
}
|
|
|
|
|
@@ -567,6 +576,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
public synchronized void removeContainerRequest(T req) {
|
|
|
Preconditions.checkArgument(req != null,
|
|
|
"Resource request can not be null.");
|
|
|
+ ProfileCapability profileCapability = ProfileCapability
|
|
|
+ .newInstance(req.getResourceProfile(), req.getCapability());
|
|
|
Set<String> allRacks = new HashSet<String>();
|
|
|
if (req.getRacks() != null) {
|
|
|
allRacks.addAll(req.getRacks());
|
|
@@ -577,17 +588,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
if (req.getNodes() != null) {
|
|
|
for (String node : new HashSet<String>(req.getNodes())) {
|
|
|
decResourceRequest(req.getPriority(), node,
|
|
|
- req.getExecutionTypeRequest(), req.getCapability(), req);
|
|
|
+ req.getExecutionTypeRequest(), profileCapability, req);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for (String rack : allRacks) {
|
|
|
decResourceRequest(req.getPriority(), rack,
|
|
|
- req.getExecutionTypeRequest(), req.getCapability(), req);
|
|
|
+ req.getExecutionTypeRequest(), profileCapability, req);
|
|
|
}
|
|
|
|
|
|
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
|
|
|
- req.getExecutionTypeRequest(), req.getCapability(), req);
|
|
|
+ req.getExecutionTypeRequest(), profileCapability, req);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -660,6 +671,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
public synchronized List<? extends Collection<T>> getMatchingRequests(
|
|
|
Priority priority, String resourceName, ExecutionType executionType,
|
|
|
Resource capability) {
|
|
|
+ ProfileCapability profileCapability =
|
|
|
+ ProfileCapability.newInstance(capability);
|
|
|
+ return getMatchingRequests(priority, resourceName, executionType,
|
|
|
+ profileCapability);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public synchronized List<? extends Collection<T>> getMatchingRequests(
|
|
|
+ Priority priority, String resourceName, ExecutionType executionType,
|
|
|
+ ProfileCapability capability) {
|
|
|
Preconditions.checkArgument(capability != null,
|
|
|
"The Resource to be requested should not be null ");
|
|
|
Preconditions.checkArgument(priority != null,
|
|
@@ -669,22 +691,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
RemoteRequestsTable remoteRequestsTable = getTable(0);
|
|
|
|
|
|
if (null != remoteRequestsTable) {
|
|
|
- List<ResourceRequestInfo<T>> matchingRequests =
|
|
|
- remoteRequestsTable.getMatchingRequests(priority, resourceName,
|
|
|
- executionType, capability);
|
|
|
+ List<ResourceRequestInfo<T>> matchingRequests = remoteRequestsTable
|
|
|
+ .getMatchingRequests(priority, resourceName, executionType,
|
|
|
+ capability);
|
|
|
if (null != matchingRequests) {
|
|
|
// If no exact match. Container may be larger than what was requested.
|
|
|
// get all resources <= capability. map is reverse sorted.
|
|
|
for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
|
|
|
- if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
|
|
|
- !resReqInfo.containerRequests.isEmpty()) {
|
|
|
+ if (canFit(resReqInfo.remoteRequest.getProfileCapability(),
|
|
|
+ capability) && !resReqInfo.containerRequests.isEmpty()) {
|
|
|
list.add(resReqInfo.containerRequests);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
// no match found
|
|
|
- return list;
|
|
|
+ return list;
|
|
|
}
|
|
|
|
|
|
private Set<String> resolveRacks(List<String> nodes) {
|
|
@@ -732,6 +754,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void checkResourceProfile(String profile) {
|
|
|
+ if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty()
|
|
|
+ && !resourceProfilesMap.containsKey(profile)) {
|
|
|
+ throw new InvalidContainerRequestException(
|
|
|
+ "Invalid profile name, valid profile names are " + resourceProfilesMap
|
|
|
+ .keySet());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Valid if a node label expression specified on container request is valid or
|
|
@@ -788,12 +819,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
}
|
|
|
|
|
|
private void addResourceRequest(Priority priority, String resourceName,
|
|
|
- ExecutionTypeRequest execTypeReq, Resource capability, T req,
|
|
|
+ ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req,
|
|
|
boolean relaxLocality, String labelExpression) {
|
|
|
RemoteRequestsTable<T> remoteRequestsTable =
|
|
|
getTable(req.getAllocationRequestId());
|
|
|
if (remoteRequestsTable == null) {
|
|
|
remoteRequestsTable = new RemoteRequestsTable<T>();
|
|
|
+ if (this.resourceProfilesMap instanceof HashMap) {
|
|
|
+ remoteRequestsTable.setResourceComparator(
|
|
|
+ new ProfileCapabilityComparator((HashMap) resourceProfilesMap));
|
|
|
+ }
|
|
|
putTable(req.getAllocationRequestId(), remoteRequestsTable);
|
|
|
}
|
|
|
@SuppressWarnings("unchecked")
|
|
@@ -806,6 +841,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
|
|
|
LOG.debug("addResourceRequest:" + " applicationId="
|
|
|
+ " priority=" + priority.getPriority()
|
|
|
+ " resourceName=" + resourceName + " numContainers="
|
|
@@ -815,7 +851,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
}
|
|
|
|
|
|
private void decResourceRequest(Priority priority, String resourceName,
|
|
|
- ExecutionTypeRequest execTypeReq, Resource capability, T req) {
|
|
|
+ ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
|
|
|
RemoteRequestsTable<T> remoteRequestsTable =
|
|
|
getTable(req.getAllocationRequestId());
|
|
|
if (remoteRequestsTable != null) {
|
|
@@ -825,7 +861,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
execTypeReq, capability, req);
|
|
|
// send the ResourceRequest to RM even if is 0 because it needs to
|
|
|
// override a previously sent value. If ResourceRequest was not sent
|
|
|
- // previously then sending 0 ought to be a no-op on RM
|
|
|
+ // previously then sending 0 aught to be a no-op on RM
|
|
|
if (resourceRequestInfo != null) {
|
|
|
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
|
|
|
|