|
@@ -56,6 +56,7 @@ import java.util.concurrent.CyclicBarrier;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
|
|
|
import org.apache.hadoop.service.ServiceStateException;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -6053,4 +6054,79 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
|
|
Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize());
|
|
|
rm1.close();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * (YARN-11191) This test ensures that no deadlock happens while the
|
|
|
+ * refreshQueues is called on the preemptionManager (refresh thread) and the
|
|
|
+ * AbstractCSQueue.getTotalKillableResource is called from the schedule thread.
|
|
|
+ *
|
|
|
+ * @throws Exception TestTimedOutException means deadlock
|
|
|
+ */
|
|
|
+ @Test (timeout = 20000)
|
|
|
+ public void testRefreshQueueWithOpenPreemption() throws Exception {
|
|
|
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a"});
|
|
|
+ csConf.setCapacity("root.a", 100);
|
|
|
+ csConf.setQueues("root.a", new String[]{"b"});
|
|
|
+ csConf.setCapacity("root.a.b", 100);
|
|
|
+
|
|
|
+ YarnConfiguration conf = new YarnConfiguration(csConf);
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
|
|
+ mgr.init(conf);
|
|
|
+ try (MockRM rm = new MockRM(csConf)) {
|
|
|
+ CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+ PreemptionManager preemptionManager = scheduler.getPreemptionManager();
|
|
|
+ rm.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ ParentQueue queue = (ParentQueue) scheduler.getQueue("a");
|
|
|
+
|
|
|
+ // The scheduler thread holds the queue's read-lock for 5 seconds
|
|
|
+ // then the preemption's read-lock is used
|
|
|
+ Thread schedulerThread = new Thread(() -> {
|
|
|
+ queue.readLock.lock();
|
|
|
+ try {
|
|
|
+ Thread.sleep(5 * 1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ preemptionManager.getKillableContainers("a",
|
|
|
+ queue.getDefaultNodeLabelExpression());
|
|
|
+ queue.readLock.unlock();
|
|
|
+ }, "SCHEDULE");
|
|
|
+
|
|
|
+ // The complete thread locks/unlocks the queue's write-lock after 1 seconds
|
|
|
+ Thread completeThread = new Thread(() -> {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ queue.writeLock.lock();
|
|
|
+ queue.writeLock.unlock();
|
|
|
+ }, "COMPLETE");
|
|
|
+
|
|
|
+
|
|
|
+ // The refresh thread holds the preemption's write-lock after 2 seconds
|
|
|
+ // while it calls the getChildQueues(ByTryLock) that
|
|
|
+ // locks(tryLocks) the queue's read-lock
|
|
|
+ Thread refreshThread = new Thread(() -> {
|
|
|
+ try {
|
|
|
+ Thread.sleep(2 * 1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ preemptionManager.refreshQueues(queue.getParent(), queue);
|
|
|
+ }, "REFRESH");
|
|
|
+ schedulerThread.start();
|
|
|
+ completeThread.start();
|
|
|
+ refreshThread.start();
|
|
|
+
|
|
|
+ schedulerThread.join();
|
|
|
+ completeThread.join();
|
|
|
+ refreshThread.join();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|