|
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
|
|
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMCommonPolicy;
|
|
|
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
|
|
|
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
|
|
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
|
|
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
|
|
@@ -47,7 +47,7 @@ import java.util.stream.Collectors;
|
|
|
* 3. Choose an anchor node among the viable nodes.
|
|
|
* 4. Choose other nodes around the anchor node based on network topology
|
|
|
*/
|
|
|
-public final class PipelinePlacementPolicy extends SCMCommonPolicy {
|
|
|
+public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
|
|
|
@VisibleForTesting
|
|
|
static final Logger LOG =
|
|
|
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
|
|
@@ -150,33 +150,41 @@ public final class PipelinePlacementPolicy extends SCMCommonPolicy {
|
|
|
public List<DatanodeDetails> chooseDatanodes(
|
|
|
List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
|
|
|
int nodesRequired, final long sizeRequired) throws SCMException {
|
|
|
- // get a list of viable nodes based on criteria
|
|
|
+ // Get a list of viable nodes based on criteria
|
|
|
+ // and make sure excludedNodes are excluded from list.
|
|
|
List<DatanodeDetails> healthyNodes =
|
|
|
filterViableNodes(excludedNodes, nodesRequired);
|
|
|
-
|
|
|
- List<DatanodeDetails> results = new ArrayList<>();
|
|
|
-
|
|
|
+
|
|
|
// Randomly picks nodes when all nodes are equal.
|
|
|
// This happens when network topology is absent or
|
|
|
// all nodes are on the same rack.
|
|
|
if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
|
|
|
LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
|
|
|
"Required nodes: {}", nodesRequired);
|
|
|
- results = super.getResultSet(nodesRequired, healthyNodes);
|
|
|
- if (results.size() < nodesRequired) {
|
|
|
- LOG.error("Unable to find the required number of healthy nodes that " +
|
|
|
- "meet the criteria. Required nodes: {}, Found nodes: {}",
|
|
|
- nodesRequired, results.size());
|
|
|
- throw new SCMException("Unable to find required number of nodes.",
|
|
|
- SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
|
|
|
- }
|
|
|
- return results;
|
|
|
+ return super.getResultSet(nodesRequired, healthyNodes);
|
|
|
+ } else {
|
|
|
+ // Since topology and rack awareness are available, picks nodes
|
|
|
+ // based on them.
|
|
|
+ return this.getResultSet(nodesRequired, healthyNodes);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * Get result set based on the pipeline placement algorithm which considers
|
|
|
+ * network topology and rack awareness.
|
|
|
+ * @param nodesRequired - Nodes Required
|
|
|
+ * @param healthyNodes - List of Nodes in the result set.
|
|
|
+ * @return a list of datanodes
|
|
|
+ * @throws SCMException SCMException
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public List<DatanodeDetails> getResultSet(
|
|
|
+ int nodesRequired, List<DatanodeDetails> healthyNodes)
|
|
|
+ throws SCMException {
|
|
|
+ List <DatanodeDetails> results = new ArrayList<>(nodesRequired);
|
|
|
// Since nodes are widely distributed, the results should be selected
|
|
|
// base on distance in topology, rack awareness and load balancing.
|
|
|
List<DatanodeDetails> exclude = new ArrayList<>();
|
|
|
- exclude.addAll(excludedNodes);
|
|
|
// First choose an anchor nodes randomly
|
|
|
DatanodeDetails anchor = chooseNode(healthyNodes);
|
|
|
if (anchor == null) {
|
|
@@ -193,7 +201,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPolicy {
|
|
|
|
|
|
// Choose the second node on different racks from anchor.
|
|
|
DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
|
|
|
- healthyNodes, excludedNodes,
|
|
|
+ healthyNodes, exclude,
|
|
|
nodeManager.getClusterNetworkTopologyMap(), anchor);
|
|
|
if (nodeOnDifferentRack == null) {
|
|
|
LOG.error("Unable to find nodes on different racks that " +
|