|
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
@@ -36,10 +37,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin
|
|
|
import java.io.IOException;
|
|
|
import java.io.Serializable;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -54,6 +53,7 @@ import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
|
|
*/
|
|
|
public class GpuResourceAllocator {
|
|
|
final static Log LOG = LogFactory.getLog(GpuResourceAllocator.class);
|
|
|
+ private static final int WAIT_MS_PER_LOOP = 1000;
|
|
|
|
|
|
private Set<GpuDevice> allowedGpuDevices = new TreeSet<>();
|
|
|
private Map<GpuDevice, ContainerId> usedDevices = new TreeMap<>();
|
|
@@ -168,13 +168,58 @@ public class GpuResourceAllocator {
|
|
|
* @return allocation results.
|
|
|
* @throws ResourceHandlerException When failed to assign GPUs.
|
|
|
*/
|
|
|
- public synchronized GpuAllocation assignGpus(Container container)
|
|
|
+ public GpuAllocation assignGpus(Container container)
|
|
|
+ throws ResourceHandlerException {
|
|
|
+ GpuAllocation allocation = internalAssignGpus(container);
|
|
|
+
|
|
|
+ // Wait for a maximum of 120 seconds if no available GPU are there which
|
|
|
+ // are yet to be released.
|
|
|
+ final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP;
|
|
|
+ int timeWaiting = 0;
|
|
|
+ while (allocation == null) {
|
|
|
+ if (timeWaiting >= timeoutMsecs) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Sleep for 1 sec to ensure there are some free GPU devices which are
|
|
|
+ // getting released.
|
|
|
+ try {
|
|
|
+ LOG.info("Container : " + container.getContainerId()
|
|
|
+ + " is waiting for free GPU devices.");
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
+ timeWaiting += WAIT_MS_PER_LOOP;
|
|
|
+ allocation = internalAssignGpus(container);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // On any interrupt, break the loop and continue execution.
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(allocation == null) {
|
|
|
+ String message = "Could not get valid GPU device for container '" +
|
|
|
+ container.getContainerId()
|
|
|
+ + "' as some other containers might not releasing GPUs.";
|
|
|
+ LOG.warn(message);
|
|
|
+ throw new ResourceHandlerException(message);
|
|
|
+ }
|
|
|
+ return allocation;
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized GpuAllocation internalAssignGpus(Container container)
|
|
|
throws ResourceHandlerException {
|
|
|
Resource requestedResource = container.getResource();
|
|
|
ContainerId containerId = container.getContainerId();
|
|
|
int numRequestedGpuDevices = getRequestedGpus(requestedResource);
|
|
|
// Assign Gpus to container if requested some.
|
|
|
if (numRequestedGpuDevices > 0) {
|
|
|
+ if (numRequestedGpuDevices > getAvailableGpus()) {
|
|
|
+ // If there are some devices which are getting released, wait for few
|
|
|
+ // seconds to get it.
|
|
|
+ if (numRequestedGpuDevices <= getReleasingGpus() + getAvailableGpus()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if (numRequestedGpuDevices > getAvailableGpus()) {
|
|
|
throw new ResourceHandlerException(
|
|
|
getResourceHandlerExceptionMessage(numRequestedGpuDevices,
|
|
@@ -211,6 +256,23 @@ public class GpuResourceAllocator {
|
|
|
return new GpuAllocation(null, allowedGpuDevices);
|
|
|
}
|
|
|
|
|
|
+ private synchronized long getReleasingGpus() {
|
|
|
+ long releasingGpus = 0;
|
|
|
+ Iterator<Map.Entry<GpuDevice, ContainerId>> iter = usedDevices.entrySet()
|
|
|
+ .iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ ContainerId containerId = iter.next().getValue();
|
|
|
+ Container container;
|
|
|
+ if ((container = nmContext.getContainers().get(containerId)) != null) {
|
|
|
+ if (container.isContainerInFinalStates()) {
|
|
|
+ releasingGpus = releasingGpus + container.getResource()
|
|
|
+ .getResourceInformation(ResourceInformation.GPU_URI).getValue();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return releasingGpus;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Clean up all Gpus assigned to containerId
|
|
|
* @param containerId containerId
|