浏览代码

YARN-5783. Verify identification of starved applications. (kasha)

Karthik Kambatla 8 年之前
父节点
当前提交
b45acb5eef
共有 8 个文件被更改,包括 391 次插入25 次删除
  1. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  2. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  3. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  4. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
  5. 33 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java
  6. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  7. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
  8. 245 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -1253,6 +1253,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     unconfirmedAllocatedVcores.addAndGet(-res.getVirtualCores());
   }
 
+  @Override
+  public int hashCode() {
+    return getApplicationAttemptId().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (! (o instanceof SchedulerApplicationAttempt)) {
+      return false;
+    }
+
+    SchedulerApplicationAttempt other = (SchedulerApplicationAttempt) o;
+    return (this == other ||
+        this.getApplicationAttemptId().equals(other.getApplicationAttemptId()));
+  }
+
   /**
    * Different state for Application Master, user can see this state from web UI
    */

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -1101,4 +1101,20 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       }
     }
   }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
 }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -1168,4 +1168,20 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     updateAMContainerDiagnostics(AMState.INACTIVATED,
         diagnosticMessageBldr.toString());
   }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /*
+   * Overriding to appease findbugs
+   */
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java

@@ -38,7 +38,7 @@ import java.util.TimerTask;
  */
 public class FSPreemptionThread extends Thread {
   private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
-  private final FSContext context;
+  protected final FSContext context;
   private final FairScheduler scheduler;
   private final long warnTimeBeforeKill;
   private final Timer preemptionTimer;

+ 33 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.Serializable;
@@ -25,28 +24,49 @@ import java.util.Comparator;
 import java.util.concurrent.PriorityBlockingQueue;
 
 /**
- * Helper class to track starved apps.
+ * Helper class to track starved applications.
  *
  * Initially, this uses a blocking queue. We could use other data structures
  * in the future. This class also has some methods to simplify testing.
  */
-public class FSStarvedApps {
-  private int numAppsAddedSoFar;
-  private PriorityBlockingQueue<FSAppAttempt> apps;
+class FSStarvedApps {
 
-  public FSStarvedApps() {
-    apps = new PriorityBlockingQueue<>(10, new StarvationComparator());
+  // List of apps to be processed by the preemption thread.
+  private PriorityBlockingQueue<FSAppAttempt> appsToProcess;
+
+  // App being currently processed. This assumes a single reader.
+  private FSAppAttempt appBeingProcessed;
+
+  FSStarvedApps() {
+    appsToProcess = new PriorityBlockingQueue<>(10, new StarvationComparator());
   }
 
-  public void addStarvedApp(FSAppAttempt app) {
-    if (!apps.contains(app)) {
-      apps.add(app);
-      numAppsAddedSoFar++;
+  /**
+   * Add a starved application if it is not already added.
+   * @param app application to add
+   */
+  void addStarvedApp(FSAppAttempt app) {
+    if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) {
+      appsToProcess.add(app);
     }
   }
 
-  public FSAppAttempt take() throws InterruptedException {
-    return apps.take();
+  /**
+   * Blocking call to fetch the next app to process. The returned app is
+   * tracked until the next call to this method. This tracking assumes a
+   * single reader.
+   *
+   * @return starved application to process
+   * @throws InterruptedException if interrupted while waiting
+   */
+  FSAppAttempt take() throws InterruptedException {
+    // Reset appBeingProcessed before the blocking call
+    appBeingProcessed = null;
+
+    // Blocking call to fetch the next starved application
+    FSAppAttempt app = appsToProcess.take();
+    appBeingProcessed = app;
+    return app;
   }
 
   private static class StarvationComparator implements
@@ -62,14 +82,4 @@ public class FSStarvedApps {
       return ret;
     }
   }
-
-  @VisibleForTesting
-  public int getNumAppsAddedSoFar() {
-    return numAppsAddedSoFar;
-  }
-
-  @VisibleForTesting
-  public int numStarvedApps() {
-    return apps.size();
-  }
 }

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1244,7 +1244,7 @@ public class FairScheduler extends
       }
 
       if (this.conf.getPreemptionEnabled()) {
-        preemptionThread = new FSPreemptionThread(this);
+        createPreemptionThread();
       }
     } finally {
       writeLock.unlock();
@@ -1262,6 +1262,11 @@ public class FairScheduler extends
     }
   }
 
+  @VisibleForTesting
+  protected void createPreemptionThread() {
+    preemptionThread = new FSPreemptionThread(this);
+  }
+
   private void updateReservationThreshold() {
     Resource newThreshold = Resources.multiply(
         getIncrementResourceCapability(),

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class FairSchedulerWithMockPreemption extends FairScheduler {
+  @Override
+  protected void createPreemptionThread() {
+    preemptionThread = new MockPreemptionThread(this);
+  }
+
+  static class MockPreemptionThread extends FSPreemptionThread {
+    private Set<FSAppAttempt> appsAdded = new HashSet<>();
+    private int totalAppsAdded = 0;
+
+    MockPreemptionThread(FairScheduler scheduler) {
+      super(scheduler);
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          FSAppAttempt app = context.getStarvedApps().take();
+          appsAdded.add(app);
+          totalAppsAdded++;
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+
+    int uniqueAppsAdded() {
+      return appsAdded.size();
+    }
+
+    int totalAppsAdded() {
+      return totalAppsAdded;
+    }
+  }
+}

+ 245 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java

@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test class to verify identification of app starvation
+ */
+public class TestFSAppStarvation extends FairSchedulerTestBase {
+
+  private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
+  private final List<RMNode> rmNodes = new ArrayList<>();
+
+  // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+  private static final int NODE_CAPACITY_MULTIPLE = 4;
+
+  private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;
+
+  @Before
+  public void setup() {
+    createConfiguration();
+    conf.set(YarnConfiguration.RM_SCHEDULER,
+        FairSchedulerWithMockPreemption.class.getCanonicalName());
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+  }
+
+  @After
+  public void teardown() {
+    ALLOC_FILE.delete();
+    conf = null;
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  /*
+   * Test to verify application starvation is computed only when preemption
+   * is enabled.
+   */
+  @Test
+  public void testPreemptionDisabled() throws Exception {
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
+
+    setupClusterAndSubmitJobs();
+
+    assertNull("Found starved apps even when preemption is turned off",
+        scheduler.getContext().getStarvedApps());
+  }
+
+  /*
+   * Test to verify application starvation is computed correctly when
+   * preemption is turned on.
+   */
+  @Test
+  public void testPreemptionEnabled() throws Exception {
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Expecting 2 starved applications, one each for the " +
+            "minshare and fairshare queues", 2,
+        preemptionThread.uniqueAppsAdded());
+
+    // Verify the apps get added again on a subsequent update
+    scheduler.update();
+    Thread.yield();
+    assertTrue("Each app is marked as starved exactly once",
+        preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
+  }
+
+  /*
+   * Test to verify app starvation is computed only when the cluster
+   * utilization threshold is over the preemption threshold.
+   */
+  @Test
+  public void testClusterUtilizationThreshold() throws Exception {
+    // Set preemption threshold to 1.1, so the utilization is always lower
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f);
+
+    setupClusterAndSubmitJobs();
+
+    assertNotNull("FSContext does not have an FSStarvedApps instance",
+        scheduler.getContext().getStarvedApps());
+    assertEquals("Found starved apps when preemption threshold is over 100%", 0,
+        preemptionThread.totalAppsAdded());
+  }
+
+  private void setupClusterAndSubmitJobs() throws Exception {
+    setupStarvedCluster();
+    submitAppsToEachLeafQueue();
+    sendNodeUpdateEvents();
+
+    // Sleep to hit the preemption timeouts
+    Thread.sleep(10);
+
+    // Scheduler update to populate starved apps
+    scheduler.update();
+
+    // Wait for apps to be processed by MockPreemptionThread
+    Thread.yield();
+  }
+
+  /**
+   * Setup the cluster for starvation testing:
+   * 1. Create FS allocation file
+   * 2. Create and start MockRM
+   * 3. Add two nodes to the cluster
+   * 4. Submit an app that uses up all resources on the cluster
+   */
+  private void setupStarvedCluster() throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+
+    // Default queue
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+
+    // Queue with preemption disabled
+    out.println("<queue name=\"no-preemption\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+
+    // Queue with minshare preemption enabled
+    out.println("<queue name=\"minshare\">");
+    out.println("<fairSharePreemptionThreshold>0" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<minSharePreemptionTimeout>0" +
+        "</minSharePreemptionTimeout>");
+    out.println("<minResources>2048mb,2vcores</minResources>");
+    out.println("</queue>");
+
+    // Queue with fairshare preemption enabled
+    out.println("<queue name=\"fairshare\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+
+    // Child queue under fairshare with same settings
+    out.println("<queue name=\"child\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("</queue>");
+
+    out.println("</queue>");
+
+    out.println("</allocations>");
+    out.close();
+
+    assertTrue("Allocation file does not exist, not running the test",
+        ALLOC_FILE.exists());
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
+        scheduler.preemptionThread;
+
+    // Create and add two nodes to the cluster
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId app
+        = createSchedulingRequest(1024, 1, "root.default", "default", 8);
+
+    scheduler.update();
+    sendNodeUpdateEvents();
+
+    assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
+  }
+
+  private void submitAppsToEachLeafQueue() {
+    String queues[] = {"no-preemption", "minshare", "fairshare.child"};
+    for (String queue : queues) {
+      createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
+    }
+    scheduler.update();
+  }
+
+  private void addNode(int memory, int cores) {
+    int id = rmNodes.size() + 1;
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
+            "127.0.0." + id);
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+    rmNodes.add(node);
+  }
+
+  private void sendNodeUpdateEvents() {
+    for (RMNode node : rmNodes) {
+      NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+          new NodeUpdateSchedulerEvent(node);
+      for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+        scheduler.handle(nodeUpdateSchedulerEvent);
+      }
+    }
+  }
+}