Prechádzať zdrojové kódy

YARN-10399 Refactor NodeQueueLoadMonitor class to make it extendable (#2228)

Refactor NodeQueueLoadMonitor class to make it extendable
Zhengbo Li 4 rokov pred
rodič
commit
9b9f7ea16a

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/ClusterNode.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * Represents a node in the cluster from the NodeQueueLoadMonitor's perspective
+ */
+public class ClusterNode {
+  private final AtomicInteger queueLength = new AtomicInteger(0);
+  private final AtomicInteger queueWaitTime = new AtomicInteger(-1);
+  private long timestamp;
+  final NodeId nodeId;
+  private int queueCapacity = 0;
+  private final HashSet<String> labels;
+
+  public ClusterNode(NodeId nodeId) {
+    this.nodeId = nodeId;
+    this.labels = new HashSet<>();
+    updateTimestamp();
+  }
+
+  public ClusterNode setQueueLength(int qLength) {
+    this.queueLength.set(qLength);
+    return this;
+  }
+
+  public ClusterNode setQueueWaitTime(int wTime) {
+    this.queueWaitTime.set(wTime);
+    return this;
+  }
+
+  public ClusterNode updateTimestamp() {
+    this.timestamp = System.currentTimeMillis();
+    return this;
+  }
+
+  public ClusterNode setQueueCapacity(int capacity) {
+    this.queueCapacity = capacity;
+    return this;
+  }
+
+  public ClusterNode setNodeLabels(Collection<String> labelsToAdd) {
+    labels.clear();
+    labels.addAll(labelsToAdd);
+    return this;
+  }
+
+  public boolean hasLabel(String label) {
+    return this.labels.contains(label);
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public AtomicInteger getQueueLength() {
+    return this.queueLength;
+  }
+
+  public AtomicInteger getQueueWaitTime() {
+    return this.queueWaitTime;
+  }
+
+  public int getQueueCapacity() {
+    return this.queueCapacity;
+  }
+
+  public boolean isQueueFull() {
+    return this.queueCapacity > 0 &&
+        this.queueLength.get() >= this.queueCapacity;
+  }
+}

+ 112 - 105
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java

@@ -40,8 +40,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED;
 
@@ -53,10 +53,10 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_OPP_CONTAINE
  */
 public class NodeQueueLoadMonitor implements ClusterMonitor {
 
-  private final static Logger LOG = LoggerFactory.
+  protected final static Logger LOG = LoggerFactory.
       getLogger(NodeQueueLoadMonitor.class);
 
-  private int numNodesForAnyAllocation =
+  protected int numNodesForAnyAllocation =
       DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED;
 
   /**
@@ -70,14 +70,14 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     @Override
     public int compare(ClusterNode o1, ClusterNode o2) {
       if (getMetric(o1) == getMetric(o2)) {
-        return (int)(o2.timestamp - o1.timestamp);
+        return (int)(o2.getTimestamp() - o1.getTimestamp());
       }
       return getMetric(o1) - getMetric(o2);
     }
 
     public int getMetric(ClusterNode c) {
       return (this == QUEUE_LENGTH) ?
-          c.queueLength.get() : c.queueWaitTime.get();
+          c.getQueueLength().get() : c.getQueueWaitTime().get();
     }
 
     /**
@@ -88,11 +88,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
      */
     public boolean compareAndIncrement(ClusterNode c, int incrementSize) {
       if(this == QUEUE_LENGTH) {
-        int ret = c.queueLength.addAndGet(incrementSize);
-        if (ret <= c.queueCapacity) {
+        int ret = c.getQueueLength().addAndGet(incrementSize);
+        if (ret <= c.getQueueCapacity()) {
           return true;
         }
-        c.queueLength.addAndGet(-incrementSize);
+        c.getQueueLength().addAndGet(-incrementSize);
         return false;
       }
       // for queue wait time, we don't have any threshold.
@@ -100,57 +100,19 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     }
   }
 
-  static class ClusterNode {
-    private AtomicInteger queueLength = new AtomicInteger(0);
-    private AtomicInteger queueWaitTime = new AtomicInteger(-1);
-    private long timestamp;
-    final NodeId nodeId;
-    private int queueCapacity = 0;
-
-    public ClusterNode(NodeId nodeId) {
-      this.nodeId = nodeId;
-      updateTimestamp();
-    }
-
-    public ClusterNode setQueueLength(int qLength) {
-      this.queueLength.set(qLength);
-      return this;
-    }
-
-    public ClusterNode setQueueWaitTime(int wTime) {
-      this.queueWaitTime.set(wTime);
-      return this;
-    }
-
-    public ClusterNode updateTimestamp() {
-      this.timestamp = System.currentTimeMillis();
-      return this;
-    }
-
-    public ClusterNode setQueueCapacity(int capacity) {
-      this.queueCapacity = capacity;
-      return this;
-    }
-
-    public boolean isQueueFull() {
-      return this.queueCapacity > 0 &&
-          this.queueLength.get() >= this.queueCapacity;
-    }
-  }
-
   private final ScheduledExecutorService scheduledExecutor;
 
-  private final List<NodeId> sortedNodes;
-  private final Map<NodeId, ClusterNode> clusterNodes =
+  protected final List<NodeId> sortedNodes;
+  protected final Map<NodeId, ClusterNode> clusterNodes =
       new ConcurrentHashMap<>();
-  private final Map<String, RMNode> nodeByHostName =
+  protected final Map<String, RMNode> nodeByHostName =
       new ConcurrentHashMap<>();
-  private final Map<String, Set<NodeId>> nodeIdsByRack =
+  protected final Map<String, Set<NodeId>> nodeIdsByRack =
       new ConcurrentHashMap<>();
-  private final LoadComparator comparator;
-  private QueueLimitCalculator thresholdCalculator;
-  private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
-  private ReentrantReadWriteLock clusterNodesLock =
+  protected final LoadComparator comparator;
+  protected QueueLimitCalculator thresholdCalculator;
+  protected ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
+  protected ReentrantReadWriteLock clusterNodesLock =
       new ReentrantReadWriteLock();
 
   Runnable computeTask = new Runnable() {
@@ -160,9 +122,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       writeLock.lock();
       try {
         try {
-          List<NodeId> nodeIds = sortNodes();
-          sortedNodes.clear();
-          sortedNodes.addAll(nodeIds);
+          updateSortedNodes();
         } catch (Exception ex) {
           LOG.warn("Got Exception while sorting nodes..", ex);
         }
@@ -193,6 +153,14 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     numNodesForAnyAllocation = numNodes;
   }
 
+  protected void updateSortedNodes() {
+    List<NodeId> nodeIds = sortNodes(true).stream()
+        .map(n -> n.nodeId)
+        .collect(Collectors.toList());
+    sortedNodes.clear();
+    sortedNodes.addAll(nodeIds);
+  }
+
   List<NodeId> getSortedNodes() {
     return sortedNodes;
   }
@@ -239,6 +207,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     ClusterNode node;
     try {
       node = this.clusterNodes.remove(removedRMNode.getNodeID());
+      onNodeRemoved(node);
     } finally {
       writeLock.unlock();
     }
@@ -251,6 +220,13 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     }
   }
 
+  /**
+   * Provide an integration point for extended class
+   * @param node the node removed
+   */
+  protected void onNodeRemoved(ClusterNode node) {
+  }
+
   @Override
   public void updateNode(RMNode rmNode) {
     LOG.debug("Node update event from: {}", rmNode.getNodeID());
@@ -260,55 +236,80 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       opportunisticContainersStatus =
           OpportunisticContainersStatus.newInstance();
     }
-    int opportQueueCapacity =
-        opportunisticContainersStatus.getOpportQueueCapacity();
-    int estimatedQueueWaitTime =
-        opportunisticContainersStatus.getEstimatedQueueWaitTime();
-    int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();
+
     // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
     // UNLESS comparator is based on queue length.
     ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
     writeLock.lock();
     try {
-      ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
-      if (currentNode == null) {
-        if (rmNode.getState() != NodeState.DECOMMISSIONING &&
-            (estimatedQueueWaitTime != -1 ||
-                comparator == LoadComparator.QUEUE_LENGTH)) {
-          this.clusterNodes.put(rmNode.getNodeID(),
-              new ClusterNode(rmNode.getNodeID())
-                  .setQueueWaitTime(estimatedQueueWaitTime)
-                  .setQueueLength(waitQueueLength)
-                  .setQueueCapacity(opportQueueCapacity));
-          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
-              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
-              "wait queue length [" + waitQueueLength + "]");
-        } else {
-          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " +
-              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
-              "wait queue length [" + waitQueueLength + "]");
-        }
+      ClusterNode clusterNode = this.clusterNodes.get(rmNode.getNodeID());
+      if (clusterNode == null) {
+        onNewNodeAdded(rmNode, opportunisticContainersStatus);
       } else {
-        if (rmNode.getState() != NodeState.DECOMMISSIONING &&
-            (estimatedQueueWaitTime != -1 ||
-                comparator == LoadComparator.QUEUE_LENGTH)) {
-          currentNode
+        onExistingNodeUpdated(rmNode, clusterNode, opportunisticContainersStatus);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected void onNewNodeAdded(
+      RMNode rmNode, OpportunisticContainersStatus status) {
+    int opportQueueCapacity = status.getOpportQueueCapacity();
+    int estimatedQueueWaitTime = status.getEstimatedQueueWaitTime();
+    int waitQueueLength = status.getWaitQueueLength();
+
+    if (rmNode.getState() != NodeState.DECOMMISSIONING &&
+        (estimatedQueueWaitTime != -1 ||
+            comparator == LoadComparator.QUEUE_LENGTH)) {
+      this.clusterNodes.put(rmNode.getNodeID(),
+          new ClusterNode(rmNode.getNodeID())
               .setQueueWaitTime(estimatedQueueWaitTime)
               .setQueueLength(waitQueueLength)
-              .updateTimestamp();
-          LOG.debug("Updating ClusterNode [{}] with queue wait time [{}] and"
+              .setNodeLabels(rmNode.getNodeLabels())
+              .setQueueCapacity(opportQueueCapacity));
+      LOG.info(
+          "Inserting ClusterNode [{}] with queue wait time [{}] and "
+              + "wait queue length [{}]",
+          rmNode.getNode(),
+          estimatedQueueWaitTime,
+          waitQueueLength
+      );
+    } else {
+      LOG.warn(
+          "IGNORING ClusterNode [{}] with queue wait time [{}] and "
+              + "wait queue length [{}]",
+          rmNode.getNode(),
+          estimatedQueueWaitTime,
+          waitQueueLength
+      );
+    }
+  }
+
+  protected void onExistingNodeUpdated(
+      RMNode rmNode, ClusterNode clusterNode,
+      OpportunisticContainersStatus status) {
+
+    int estimatedQueueWaitTime = status.getEstimatedQueueWaitTime();
+    int waitQueueLength = status.getWaitQueueLength();
+
+    if (rmNode.getState() != NodeState.DECOMMISSIONING &&
+        (estimatedQueueWaitTime != -1 ||
+            comparator == LoadComparator.QUEUE_LENGTH)) {
+      clusterNode
+          .setQueueWaitTime(estimatedQueueWaitTime)
+          .setQueueLength(waitQueueLength)
+          .setNodeLabels(rmNode.getNodeLabels())
+          .updateTimestamp();
+      LOG.debug("Updating ClusterNode [{}] with queue wait time [{}] and"
               + " wait queue length [{}]", rmNode.getNodeID(),
-              estimatedQueueWaitTime, waitQueueLength);
+          estimatedQueueWaitTime, waitQueueLength);
 
-        } else {
-          this.clusterNodes.remove(rmNode.getNodeID());
-          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " +
-              "with queue wait time [" + currentNode.queueWaitTime + "] and " +
-              "wait queue length [" + currentNode.queueLength + "]");
-        }
-      }
-    } finally {
-      writeLock.unlock();
+    } else {
+      this.clusterNodes.remove(rmNode.getNodeID());
+      LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " +
+          "with queue wait time [" + clusterNode.getQueueWaitTime() + "] and " +
+          "wait queue length [" + clusterNode.getQueueLength() + "]");
     }
   }
 
@@ -374,7 +375,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
   }
 
   public RMNode selectAnyNode(Set<String> blacklist) {
-    List<NodeId> nodeIds = selectLeastLoadedNodes(numNodesForAnyAllocation);
+    List<NodeId> nodeIds = getCandidatesForSelectAnyNode();
     int size = nodeIds.size();
     if (size <= 0) {
       return null;
@@ -395,22 +396,26 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     return null;
   }
 
-  private void removeFromNodeIdsByRack(RMNode removedNode) {
+  protected List<NodeId> getCandidatesForSelectAnyNode() {
+    return selectLeastLoadedNodes(numNodesForAnyAllocation);
+  }
+
+  protected void removeFromNodeIdsByRack(RMNode removedNode) {
     nodeIdsByRack.computeIfPresent(removedNode.getRackName(),
         (k, v) -> v).remove(removedNode.getNodeID());
   }
 
-  private void addIntoNodeIdsByRack(RMNode addedNode) {
+  protected void addIntoNodeIdsByRack(RMNode addedNode) {
     nodeIdsByRack.compute(addedNode.getRackName(), (k, v) -> v == null ?
         ConcurrentHashMap.newKeySet() : v).add(addedNode.getNodeID());
   }
 
-  private List<NodeId> sortNodes() {
+  protected List<ClusterNode> sortNodes(boolean excludeFullNodes) {
     ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
     readLock.lock();
     try {
-      ArrayList aList = new ArrayList<>(this.clusterNodes.values());
-      List<NodeId> retList = new ArrayList<>();
+      ArrayList<ClusterNode> aList = new ArrayList<>(this.clusterNodes.values());
+      List<ClusterNode> retList = new ArrayList<>();
       Object[] nodes = aList.toArray();
       // Collections.sort would do something similar by calling Arrays.sort
       // internally but would finally iterate through the input list (aList)
@@ -420,9 +425,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
       Arrays.sort(nodes, (Comparator)comparator);
       for (int j=0; j < nodes.length; j++) {
         ClusterNode cNode = (ClusterNode)nodes[j];
-        // Exclude nodes whose queue is already full.
-        if (!cNode.isQueueFull()) {
-          retList.add(cNode.nodeId);
+        // Only add node to the result list when either condition is met:
+        // 1. we don't exclude full nodes
+        // 2. we do exclude full nodes, but the current node is not full
+        if (!excludeFullNodes || !cNode.isQueueFull()) {
+          retList.add(cNode);
         }
       }
       return retList;

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator;
 
 import java.util.List;