|
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
|
|
import java.security.PrivilegedAction;
|
|
import java.security.PrivilegedAction;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
|
|
+import java.util.Collections;
|
|
import java.util.Comparator;
|
|
import java.util.Comparator;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|
|
|
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
|
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -75,6 +77,8 @@ import com.google.common.base.Preconditions;
|
|
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
|
|
private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
|
|
|
|
+ private static final List<String> ANY_LIST =
|
|
|
|
+ Collections.singletonList(ResourceRequest.ANY);
|
|
|
|
|
|
private final RecordFactory recordFactory =
|
|
private final RecordFactory recordFactory =
|
|
RecordFactoryProvider.getRecordFactory(null);
|
|
RecordFactoryProvider.getRecordFactory(null);
|
|
@@ -91,9 +95,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
LinkedHashSet<T> containerRequests;
|
|
LinkedHashSet<T> containerRequests;
|
|
|
|
|
|
ResourceRequestInfo(Priority priority, String resourceName,
|
|
ResourceRequestInfo(Priority priority, String resourceName,
|
|
- Resource capability) {
|
|
|
|
|
|
+ Resource capability, boolean relaxLocality) {
|
|
remoteRequest = ResourceRequest.newInstance(priority, resourceName,
|
|
remoteRequest = ResourceRequest.newInstance(priority, resourceName,
|
|
capability, 0);
|
|
capability, 0);
|
|
|
|
+ remoteRequest.setRelaxLocality(relaxLocality);
|
|
containerRequests = new LinkedHashSet<T>();
|
|
containerRequests = new LinkedHashSet<T>();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -226,7 +231,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
@Override
|
|
@Override
|
|
public AllocateResponse allocate(float progressIndicator)
|
|
public AllocateResponse allocate(float progressIndicator)
|
|
throws YarnException, IOException {
|
|
throws YarnException, IOException {
|
|
- Preconditions.checkArgument(progressIndicator > 0,
|
|
|
|
|
|
+ Preconditions.checkArgument(progressIndicator >= 0,
|
|
"Progress indicator should not be negative");
|
|
"Progress indicator should not be negative");
|
|
AllocateResponse allocateResponse = null;
|
|
AllocateResponse allocateResponse = null;
|
|
ArrayList<ResourceRequest> askList = null;
|
|
ArrayList<ResourceRequest> askList = null;
|
|
@@ -326,17 +331,30 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
public synchronized void addContainerRequest(T req) {
|
|
public synchronized void addContainerRequest(T req) {
|
|
Preconditions.checkArgument(req != null,
|
|
Preconditions.checkArgument(req != null,
|
|
"Resource request can not be null.");
|
|
"Resource request can not be null.");
|
|
- Set<String> allRacks = new HashSet<String>();
|
|
|
|
|
|
+ Set<String> dedupedRacks = new HashSet<String>();
|
|
if (req.getRacks() != null) {
|
|
if (req.getRacks() != null) {
|
|
- allRacks.addAll(req.getRacks());
|
|
|
|
- if(req.getRacks().size() != allRacks.size()) {
|
|
|
|
|
|
+ dedupedRacks.addAll(req.getRacks());
|
|
|
|
+ if(req.getRacks().size() != dedupedRacks.size()) {
|
|
Joiner joiner = Joiner.on(',');
|
|
Joiner joiner = Joiner.on(',');
|
|
LOG.warn("ContainerRequest has duplicate racks: "
|
|
LOG.warn("ContainerRequest has duplicate racks: "
|
|
+ joiner.join(req.getRacks()));
|
|
+ joiner.join(req.getRacks()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- allRacks.addAll(resolveRacks(req.getNodes()));
|
|
|
|
-
|
|
|
|
|
|
+ Set<String> inferredRacks = resolveRacks(req.getNodes());
|
|
|
|
+ inferredRacks.removeAll(dedupedRacks);
|
|
|
|
+
|
|
|
|
+ // check that specific and non-specific requests cannot be mixed within a
|
|
|
|
+ // priority
|
|
|
|
+ checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
|
|
|
|
+ req.getRelaxLocality());
|
|
|
|
+ // check that specific rack cannot be mixed with specific node within a
|
|
|
|
+ // priority. If node and its rack are both specified then they must be
|
|
|
|
+ // in the same request.
|
|
|
|
+ // For explicitly requested racks, we set locality relaxation to true
|
|
|
|
+ checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
|
|
|
|
+ checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
|
|
|
|
+ req.getRelaxLocality());
|
|
|
|
+
|
|
if (req.getNodes() != null) {
|
|
if (req.getNodes() != null) {
|
|
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
|
|
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
|
|
if(dedupedNodes.size() != req.getNodes().size()) {
|
|
if(dedupedNodes.size() != req.getNodes().size()) {
|
|
@@ -345,21 +363,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
+ joiner.join(req.getNodes()));
|
|
+ joiner.join(req.getNodes()));
|
|
}
|
|
}
|
|
for (String node : dedupedNodes) {
|
|
for (String node : dedupedNodes) {
|
|
- // Ensure node requests are accompanied by requests for
|
|
|
|
- // corresponding rack
|
|
|
|
addResourceRequest(req.getPriority(), node, req.getCapability(),
|
|
addResourceRequest(req.getPriority(), node, req.getCapability(),
|
|
- req.getContainerCount(), req);
|
|
|
|
|
|
+ req.getContainerCount(), req, true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- for (String rack : allRacks) {
|
|
|
|
|
|
+ for (String rack : dedupedRacks) {
|
|
addResourceRequest(req.getPriority(), rack, req.getCapability(),
|
|
addResourceRequest(req.getPriority(), rack, req.getCapability(),
|
|
- req.getContainerCount(), req);
|
|
|
|
|
|
+ req.getContainerCount(), req, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Ensure node requests are accompanied by requests for
|
|
|
|
+ // corresponding rack
|
|
|
|
+ for (String rack : inferredRacks) {
|
|
|
|
+ addResourceRequest(req.getPriority(), rack, req.getCapability(),
|
|
|
|
+ req.getContainerCount(), req, req.getRelaxLocality());
|
|
}
|
|
}
|
|
|
|
|
|
// Off-switch
|
|
// Off-switch
|
|
addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
|
|
addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
|
|
- req.getContainerCount(), req);
|
|
|
|
|
|
+ req.getContainerCount(), req, req.getRelaxLocality());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -428,7 +451,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
}
|
|
}
|
|
|
|
|
|
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
|
|
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
|
|
- if (resourceRequestInfo != null) {
|
|
|
|
|
|
+ if (resourceRequestInfo != null &&
|
|
|
|
+ !resourceRequestInfo.containerRequests.isEmpty()) {
|
|
list.add(resourceRequestInfo.containerRequests);
|
|
list.add(resourceRequestInfo.containerRequests);
|
|
return list;
|
|
return list;
|
|
}
|
|
}
|
|
@@ -438,7 +462,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
SortedMap<Resource, ResourceRequestInfo> tailMap =
|
|
SortedMap<Resource, ResourceRequestInfo> tailMap =
|
|
reqMap.tailMap(capability);
|
|
reqMap.tailMap(capability);
|
|
for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
|
|
for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
|
|
- if(canFit(entry.getKey(), capability)) {
|
|
|
|
|
|
+ if (canFit(entry.getKey(), capability) &&
|
|
|
|
+ !entry.getValue().containerRequests.isEmpty()) {
|
|
// match found that fits in the larger resource
|
|
// match found that fits in the larger resource
|
|
list.add(entry.getValue().containerRequests);
|
|
list.add(entry.getValue().containerRequests);
|
|
}
|
|
}
|
|
@@ -466,6 +491,33 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
return racks;
|
|
return racks;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * ContainerRequests with locality relaxation cannot be made at the same
|
|
|
|
+ * priority as ContainerRequests without locality relaxation.
|
|
|
|
+ */
|
|
|
|
+ private void checkLocalityRelaxationConflict(Priority priority,
|
|
|
|
+ Collection<String> locations, boolean relaxLocality) {
|
|
|
|
+ Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
|
|
|
+ this.remoteRequestsTable.get(priority);
|
|
|
|
+ if (remoteRequests == null) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // Locality relaxation will be set to relaxLocality for all implicitly
|
|
|
|
+ // requested racks. Make sure that existing rack requests match this.
|
|
|
|
+ for (String location : locations) {
|
|
|
|
+ TreeMap<Resource, ResourceRequestInfo> reqs =
|
|
|
|
+ remoteRequests.get(location);
|
|
|
|
+ if (reqs != null && !reqs.isEmpty()
|
|
|
|
+ && reqs.values().iterator().next().remoteRequest.getRelaxLocality()
|
|
|
|
+ != relaxLocality) {
|
|
|
|
+ throw new InvalidContainerRequestException("Cannot submit a "
|
|
|
|
+ + "ContainerRequest asking for location " + location
|
|
|
|
+ + " with locality relaxation " + relaxLocality + " when it has "
|
|
|
|
+ + "already been requested with locality relaxation " + relaxLocality);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
|
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
|
// This code looks weird but is needed because of the following scenario.
|
|
// This code looks weird but is needed because of the following scenario.
|
|
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
|
|
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
|
|
@@ -484,7 +536,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
}
|
|
}
|
|
|
|
|
|
private void addResourceRequest(Priority priority, String resourceName,
|
|
private void addResourceRequest(Priority priority, String resourceName,
|
|
- Resource capability, int containerCount, T req) {
|
|
|
|
|
|
+ Resource capability, int containerCount, T req, boolean relaxLocality) {
|
|
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
|
Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
|
|
this.remoteRequestsTable.get(priority);
|
|
this.remoteRequestsTable.get(priority);
|
|
if (remoteRequests == null) {
|
|
if (remoteRequests == null) {
|
|
@@ -506,14 +558,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
|
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
|
|
ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
|
|
if (resourceRequestInfo == null) {
|
|
if (resourceRequestInfo == null) {
|
|
resourceRequestInfo =
|
|
resourceRequestInfo =
|
|
- new ResourceRequestInfo(priority, resourceName, capability);
|
|
|
|
|
|
+ new ResourceRequestInfo(priority, resourceName, capability,
|
|
|
|
+ relaxLocality);
|
|
reqMap.put(capability, resourceRequestInfo);
|
|
reqMap.put(capability, resourceRequestInfo);
|
|
}
|
|
}
|
|
|
|
|
|
resourceRequestInfo.remoteRequest.setNumContainers(
|
|
resourceRequestInfo.remoteRequest.setNumContainers(
|
|
resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
|
|
resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
|
|
|
|
|
|
- if(req instanceof StoredContainerRequest) {
|
|
|
|
|
|
+ if (req instanceof StoredContainerRequest && relaxLocality) {
|
|
resourceRequestInfo.containerRequests.add(req);
|
|
resourceRequestInfo.containerRequests.add(req);
|
|
}
|
|
}
|
|
|
|
|