|
@@ -22,6 +22,8 @@ import java.io.IOException;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -82,8 +84,9 @@ public abstract class AbstractYarnScheduler
|
|
|
private Resource configuredMaximumAllocation;
|
|
|
private int maxNodeMemory = -1;
|
|
|
private int maxNodeVCores = -1;
|
|
|
- private ReentrantReadWriteLock maximumAllocationLock =
|
|
|
- new ReentrantReadWriteLock();
|
|
|
+ private final ReadLock maxAllocReadLock;
|
|
|
+ private final WriteLock maxAllocWriteLock;
|
|
|
+
|
|
|
private boolean useConfiguredMaximumAllocationOnly = true;
|
|
|
private long configuredMaximumAllocationWaitTime;
|
|
|
|
|
@@ -103,6 +106,9 @@ public abstract class AbstractYarnScheduler
|
|
|
*/
|
|
|
public AbstractYarnScheduler(String name) {
|
|
|
super(name);
|
|
|
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
|
|
+ this.maxAllocReadLock = lock.readLock();
|
|
|
+ this.maxAllocWriteLock = lock.writeLock();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -157,8 +163,7 @@ public abstract class AbstractYarnScheduler
|
|
|
@Override
|
|
|
public Resource getMaximumResourceCapability() {
|
|
|
Resource maxResource;
|
|
|
- ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock();
|
|
|
- readLock.lock();
|
|
|
+ maxAllocReadLock.lock();
|
|
|
try {
|
|
|
if (useConfiguredMaximumAllocationOnly) {
|
|
|
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
|
|
@@ -170,22 +175,20 @@ public abstract class AbstractYarnScheduler
|
|
|
maxResource = Resources.clone(maximumAllocation);
|
|
|
}
|
|
|
} finally {
|
|
|
- readLock.unlock();
|
|
|
+ maxAllocReadLock.unlock();
|
|
|
}
|
|
|
return maxResource;
|
|
|
}
|
|
|
|
|
|
protected void initMaximumResourceCapability(Resource maximumAllocation) {
|
|
|
- ReentrantReadWriteLock.WriteLock writeLock =
|
|
|
- maximumAllocationLock.writeLock();
|
|
|
- writeLock.lock();
|
|
|
+ maxAllocWriteLock.lock();
|
|
|
try {
|
|
|
if (this.configuredMaximumAllocation == null) {
|
|
|
this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
|
|
|
this.maximumAllocation = Resources.clone(maximumAllocation);
|
|
|
}
|
|
|
} finally {
|
|
|
- writeLock.unlock();
|
|
|
+ maxAllocWriteLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -535,19 +538,24 @@ public abstract class AbstractYarnScheduler
|
|
|
*/
|
|
|
public synchronized void updateNodeResource(RMNode nm,
|
|
|
ResourceOption resourceOption) {
|
|
|
-
|
|
|
SchedulerNode node = getSchedulerNode(nm.getNodeID());
|
|
|
Resource newResource = resourceOption.getResource();
|
|
|
Resource oldResource = node.getTotalResource();
|
|
|
if(!oldResource.equals(newResource)) {
|
|
|
// Log resource change
|
|
|
- LOG.info("Update resource on node: " + node.getNodeName()
|
|
|
+ LOG.info("Update resource on node: " + node.getNodeName()
|
|
|
+ " from: " + oldResource + ", to: "
|
|
|
+ newResource);
|
|
|
|
|
|
+ nodes.remove(nm.getNodeID());
|
|
|
+ updateMaximumAllocation(node, false);
|
|
|
+
|
|
|
// update resource to node
|
|
|
node.setTotalResource(newResource);
|
|
|
-
|
|
|
+
|
|
|
+ nodes.put(nm.getNodeID(), (N)node);
|
|
|
+ updateMaximumAllocation(node, true);
|
|
|
+
|
|
|
// update resource to clusterResource
|
|
|
Resources.subtractFrom(clusterResource, oldResource);
|
|
|
Resources.addTo(clusterResource, newResource);
|
|
@@ -571,28 +579,27 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
|
|
|
protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
|
|
|
- ReentrantReadWriteLock.WriteLock writeLock =
|
|
|
- maximumAllocationLock.writeLock();
|
|
|
- writeLock.lock();
|
|
|
+ Resource totalResource = node.getTotalResource();
|
|
|
+ maxAllocWriteLock.lock();
|
|
|
try {
|
|
|
if (add) { // added node
|
|
|
- int nodeMemory = node.getTotalResource().getMemory();
|
|
|
+ int nodeMemory = totalResource.getMemory();
|
|
|
if (nodeMemory > maxNodeMemory) {
|
|
|
maxNodeMemory = nodeMemory;
|
|
|
maximumAllocation.setMemory(Math.min(
|
|
|
configuredMaximumAllocation.getMemory(), maxNodeMemory));
|
|
|
}
|
|
|
- int nodeVCores = node.getTotalResource().getVirtualCores();
|
|
|
+ int nodeVCores = totalResource.getVirtualCores();
|
|
|
if (nodeVCores > maxNodeVCores) {
|
|
|
maxNodeVCores = nodeVCores;
|
|
|
maximumAllocation.setVirtualCores(Math.min(
|
|
|
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
|
|
|
}
|
|
|
} else { // removed node
|
|
|
- if (maxNodeMemory == node.getTotalResource().getMemory()) {
|
|
|
+ if (maxNodeMemory == totalResource.getMemory()) {
|
|
|
maxNodeMemory = -1;
|
|
|
}
|
|
|
- if (maxNodeVCores == node.getTotalResource().getVirtualCores()) {
|
|
|
+ if (maxNodeVCores == totalResource.getVirtualCores()) {
|
|
|
maxNodeVCores = -1;
|
|
|
}
|
|
|
// We only have to iterate through the nodes if the current max memory
|
|
@@ -625,7 +632,7 @@ public abstract class AbstractYarnScheduler
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- writeLock.unlock();
|
|
|
+ maxAllocWriteLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
}
|