|
@@ -38,8 +38,6 @@ import java.util.logging.*;
|
|
class FSNamesystem implements FSConstants {
|
|
class FSNamesystem implements FSConstants {
|
|
public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.FSNamesystem");
|
|
public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.FSNamesystem");
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
//
|
|
//
|
|
// Stores the correct file name hierarchy
|
|
// Stores the correct file name hierarchy
|
|
//
|
|
//
|
|
@@ -144,11 +142,7 @@ class FSNamesystem implements FSConstants {
|
|
private int minReplication;
|
|
private int minReplication;
|
|
// HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
|
|
// HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
|
|
private int heartBeatRecheck;
|
|
private int heartBeatRecheck;
|
|
- // Whether we should use disk-availability info when determining target
|
|
|
|
- private boolean useAvailability;
|
|
|
|
|
|
|
|
- private boolean allowSameHostTargets;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* dir is where the filesystem directory state
|
|
* dir is where the filesystem directory state
|
|
* is stored
|
|
* is stored
|
|
@@ -167,9 +161,6 @@ class FSNamesystem implements FSConstants {
|
|
this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
|
|
this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
|
|
this.minReplication = 1;
|
|
this.minReplication = 1;
|
|
this.heartBeatRecheck= 1000;
|
|
this.heartBeatRecheck= 1000;
|
|
- this.useAvailability = conf.getBoolean("dfs.availability.allocation", false);
|
|
|
|
- this.allowSameHostTargets =
|
|
|
|
- conf.getBoolean("test.dfs.same.host.targets.allowed", false);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/** Close down this filesystem manager.
|
|
/** Close down this filesystem manager.
|
|
@@ -241,7 +232,7 @@ class FSNamesystem implements FSConstants {
|
|
* of machines, or null if src is invalid for creation (based on
|
|
* of machines, or null if src is invalid for creation (based on
|
|
* {@link FSDirectory#isValidToCreate(UTF8)}.
|
|
* {@link FSDirectory#isValidToCreate(UTF8)}.
|
|
*/
|
|
*/
|
|
- public synchronized Object[] startFile(UTF8 src, UTF8 holder, boolean overwrite) {
|
|
|
|
|
|
+ public synchronized Object[] startFile(UTF8 src, UTF8 holder, UTF8 clientMachine, boolean overwrite) {
|
|
Object results[] = null;
|
|
Object results[] = null;
|
|
if (pendingCreates.get(src) == null) {
|
|
if (pendingCreates.get(src) == null) {
|
|
boolean fileValid = dir.isValidToCreate(src);
|
|
boolean fileValid = dir.isValidToCreate(src);
|
|
@@ -254,7 +245,7 @@ class FSNamesystem implements FSConstants {
|
|
results = new Object[2];
|
|
results = new Object[2];
|
|
|
|
|
|
// Get the array of replication targets
|
|
// Get the array of replication targets
|
|
- DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
|
|
|
|
|
|
+ DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine);
|
|
if (targets.length < this.minReplication) {
|
|
if (targets.length < this.minReplication) {
|
|
LOG.warning("Target-length is " + targets.length +
|
|
LOG.warning("Target-length is " + targets.length +
|
|
", below MIN_REPLICATION (" + this.minReplication+ ")");
|
|
", below MIN_REPLICATION (" + this.minReplication+ ")");
|
|
@@ -300,7 +291,7 @@ class FSNamesystem implements FSConstants {
|
|
* are replicated. Will return an empty 2-elt array if we want the
|
|
* are replicated. Will return an empty 2-elt array if we want the
|
|
* client to "try again later".
|
|
* client to "try again later".
|
|
*/
|
|
*/
|
|
- public synchronized Object[] getAdditionalBlock(UTF8 src) {
|
|
|
|
|
|
+ public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {
|
|
Object results[] = null;
|
|
Object results[] = null;
|
|
if (dir.getFile(src) == null && pendingCreates.get(src) != null) {
|
|
if (dir.getFile(src) == null && pendingCreates.get(src) != null) {
|
|
results = new Object[2];
|
|
results = new Object[2];
|
|
@@ -310,7 +301,7 @@ class FSNamesystem implements FSConstants {
|
|
//
|
|
//
|
|
if (checkFileProgress(src)) {
|
|
if (checkFileProgress(src)) {
|
|
// Get the array of replication targets
|
|
// Get the array of replication targets
|
|
- DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null);
|
|
|
|
|
|
+ DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine);
|
|
if (targets.length < this.minReplication) {
|
|
if (targets.length < this.minReplication) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -1171,7 +1162,7 @@ class FSNamesystem implements FSConstants {
|
|
} else {
|
|
} else {
|
|
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
|
|
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
|
|
if (containingNodes.contains(srcNode)) {
|
|
if (containingNodes.contains(srcNode)) {
|
|
- DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes);
|
|
|
|
|
|
+ DatanodeInfo targets[] = chooseTargets(Math.min(this.desiredReplication - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null);
|
|
if (targets.length > 0) {
|
|
if (targets.length > 0) {
|
|
// Build items to return
|
|
// Build items to return
|
|
replicateBlocks.add(block);
|
|
replicateBlocks.add(block);
|
|
@@ -1228,12 +1219,12 @@ class FSNamesystem implements FSConstants {
|
|
* considered targets.
|
|
* considered targets.
|
|
* @return array of DatanodeInfo instances uses as targets.
|
|
* @return array of DatanodeInfo instances uses as targets.
|
|
*/
|
|
*/
|
|
- DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes) {
|
|
|
|
|
|
+ DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, UTF8 clientMachine) {
|
|
TreeSet alreadyChosen = new TreeSet();
|
|
TreeSet alreadyChosen = new TreeSet();
|
|
Vector targets = new Vector();
|
|
Vector targets = new Vector();
|
|
|
|
|
|
for (int i = 0; i < desiredReplicates; i++) {
|
|
for (int i = 0; i < desiredReplicates; i++) {
|
|
- DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen);
|
|
|
|
|
|
+ DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, clientMachine);
|
|
if (target != null) {
|
|
if (target != null) {
|
|
targets.add(target);
|
|
targets.add(target);
|
|
alreadyChosen.add(target);
|
|
alreadyChosen.add(target);
|
|
@@ -1256,7 +1247,7 @@ class FSNamesystem implements FSConstants {
|
|
* @return DatanodeInfo instance to use or null if something went wrong
|
|
* @return DatanodeInfo instance to use or null if something went wrong
|
|
* (a log message is emitted if null is returned).
|
|
* (a log message is emitted if null is returned).
|
|
*/
|
|
*/
|
|
- DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2) {
|
|
|
|
|
|
+ DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, UTF8 clientMachine) {
|
|
//
|
|
//
|
|
// Check if there are any available targets at all
|
|
// Check if there are any available targets at all
|
|
//
|
|
//
|
|
@@ -1266,84 +1257,87 @@ class FSNamesystem implements FSConstants {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- TreeSet forbiddenMachines = new TreeSet();
|
|
|
|
//
|
|
//
|
|
- // In addition to already-chosen datanode/port pairs, we want to avoid
|
|
|
|
- // already-chosen machinenames. (There can be multiple datanodes per
|
|
|
|
- // machine.) We might relax this requirement in the future, though. (Maybe
|
|
|
|
- // so that at least one replicate is off the machine.)
|
|
|
|
|
|
+ // Build a map of forbidden hostnames from the two forbidden sets.
|
|
//
|
|
//
|
|
- UTF8 hostOrHostAndPort = null;
|
|
|
|
|
|
+ TreeSet forbiddenMachines = new TreeSet();
|
|
if (forbidden1 != null) {
|
|
if (forbidden1 != null) {
|
|
- // add name [and host] of all elements in forbidden1 to forbiddenMachines
|
|
|
|
for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
|
|
for (Iterator it = forbidden1.iterator(); it.hasNext(); ) {
|
|
DatanodeInfo cur = (DatanodeInfo) it.next();
|
|
DatanodeInfo cur = (DatanodeInfo) it.next();
|
|
- if (allowSameHostTargets) {
|
|
|
|
- hostOrHostAndPort = cur.getName(); // forbid same host:port
|
|
|
|
- } else {
|
|
|
|
- hostOrHostAndPort = cur.getHost(); // forbid same host
|
|
|
|
- }
|
|
|
|
- forbiddenMachines.add(hostOrHostAndPort);
|
|
|
|
|
|
+ forbiddenMachines.add(cur.getHost());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (forbidden2 != null) {
|
|
if (forbidden2 != null) {
|
|
- // add name [and host] of all elements in forbidden2 to forbiddenMachines
|
|
|
|
for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
|
|
for (Iterator it = forbidden2.iterator(); it.hasNext(); ) {
|
|
DatanodeInfo cur = (DatanodeInfo) it.next();
|
|
DatanodeInfo cur = (DatanodeInfo) it.next();
|
|
- if (allowSameHostTargets) {
|
|
|
|
- hostOrHostAndPort = cur.getName(); // forbid same host:port
|
|
|
|
- } else {
|
|
|
|
- hostOrHostAndPort = cur.getHost(); // forbid same host
|
|
|
|
- }
|
|
|
|
- forbiddenMachines.add(hostOrHostAndPort);
|
|
|
|
|
|
+ forbiddenMachines.add(cur.getHost());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|
|
- // Now build list of machines we can actually choose from
|
|
|
|
|
|
+ // Build list of machines we can actually choose from
|
|
//
|
|
//
|
|
- long totalRemaining = 0;
|
|
|
|
|
|
+ long latestRemaining = 0;
|
|
Vector targetList = new Vector();
|
|
Vector targetList = new Vector();
|
|
for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
|
|
for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
|
|
DatanodeInfo node = (DatanodeInfo) it.next();
|
|
DatanodeInfo node = (DatanodeInfo) it.next();
|
|
- if (allowSameHostTargets) {
|
|
|
|
- hostOrHostAndPort = node.getName(); // match host:port
|
|
|
|
- } else {
|
|
|
|
- hostOrHostAndPort = node.getHost(); // match host
|
|
|
|
- }
|
|
|
|
- if (! forbiddenMachines.contains(hostOrHostAndPort)) {
|
|
|
|
|
|
+ if (! forbiddenMachines.contains(node.getHost())) {
|
|
targetList.add(node);
|
|
targetList.add(node);
|
|
- totalRemaining += node.getRemaining();
|
|
|
|
|
|
+ latestRemaining += node.getRemaining();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|
|
// Now pick one
|
|
// Now pick one
|
|
//
|
|
//
|
|
- if (targetList.size() == 0) {
|
|
|
|
- LOG.warning("Zero targets found, forbidden1.size=" +
|
|
|
|
- ( forbidden1 != null ? forbidden1.size() : 0 ) +
|
|
|
|
- " allowSameHostTargets=" + allowSameHostTargets +
|
|
|
|
- " forbidden2.size()=" +
|
|
|
|
- ( forbidden2 != null ? forbidden2.size() : 0 ));
|
|
|
|
- return null;
|
|
|
|
- } else if (! this.useAvailability) {
|
|
|
|
- int target = r.nextInt(targetList.size());
|
|
|
|
- return (DatanodeInfo) targetList.elementAt(target);
|
|
|
|
- } else {
|
|
|
|
- // Choose node according to target capacity
|
|
|
|
- double target = r.nextDouble() * totalRemaining;
|
|
|
|
|
|
+ if (targetList.size() > 0) {
|
|
|
|
+ //
|
|
|
|
+ // If the requester's machine is in the targetList,
|
|
|
|
+ // and it's got the capacity, pick it.
|
|
|
|
+ //
|
|
|
|
+ if (clientMachine != null && clientMachine.getLength() > 0) {
|
|
|
|
+ for (Iterator it = targetList.iterator(); it.hasNext(); ) {
|
|
|
|
+ DatanodeInfo node = (DatanodeInfo) it.next();
|
|
|
|
+ if (clientMachine.equals(node.getHost())) {
|
|
|
|
+ if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {
|
|
|
|
+ return node;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ //
|
|
|
|
+ // Otherwise, choose node according to target capacity
|
|
|
|
+ //
|
|
|
|
+ double target = Math.abs(r.nextDouble()) * latestRemaining;
|
|
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
|
|
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
|
|
DatanodeInfo node = (DatanodeInfo) it.next();
|
|
DatanodeInfo node = (DatanodeInfo) it.next();
|
|
target -= node.getRemaining();
|
|
target -= node.getRemaining();
|
|
- if (target <= 0) {
|
|
|
|
|
|
+ if ((node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) &&
|
|
|
|
+ (target <= 0)) {
|
|
return node;
|
|
return node;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- LOG.warning("Impossible state. When trying to choose target node, could not find any. This may indicate that datanode capacities are being updated during datanode selection. Anyway, now returning an arbitrary target to recover...");
|
|
|
|
- return (DatanodeInfo) targetList.elementAt(r.nextInt(targetList.size()));
|
|
|
|
|
|
+ //
|
|
|
|
+ // That should do the trick. But we might not be able
|
|
|
|
+ // to pick any node if the target was out of bytes. As
|
|
|
|
+ // a last resort, pick the first valid one we can find.
|
|
|
|
+ //
|
|
|
|
+ for (Iterator it = targetList.iterator(); it.hasNext(); ) {
|
|
|
|
+ DatanodeInfo node = (DatanodeInfo) it.next();
|
|
|
|
+ if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {
|
|
|
|
+ return node;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ LOG.warning("Could not find any nodes with sufficient capacity");
|
|
|
|
+ return null;
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warning("Zero targets found, forbidden1.size=" +
|
|
|
|
+ ( forbidden1 != null ? forbidden1.size() : 0 ) +
|
|
|
|
+ " forbidden2.size()=" +
|
|
|
|
+ ( forbidden2 != null ? forbidden2.size() : 0 ));
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|