|
@@ -37,10 +37,6 @@ import java.util.logging.*;
|
|
|
***************************************************/
|
|
|
class FSNamesystem implements FSConstants {
|
|
|
public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.FSNamesystem");
|
|
|
- static {
|
|
|
- // for debugging the pending Creates problems
|
|
|
- LOG.setLevel(Level.FINE);
|
|
|
- }
|
|
|
|
|
|
//
|
|
|
// Stores the correct file name hierarchy
|
|
@@ -276,6 +272,10 @@ class FSNamesystem implements FSConstants {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ public long getBlockSize(String filename) throws IOException {
|
|
|
+ return dir.getBlockSize(filename);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check whether the replication parameter is within the range
|
|
|
* determined by system configuration.
|
|
@@ -312,7 +312,8 @@ class FSNamesystem implements FSConstants {
|
|
|
UTF8 holder,
|
|
|
UTF8 clientMachine,
|
|
|
boolean overwrite,
|
|
|
- short replication
|
|
|
+ short replication,
|
|
|
+ long blockSize
|
|
|
) throws IOException {
|
|
|
NameNode.stateChangeLog.fine("DIR* NameSystem.startFile: file "
|
|
|
+src+" for "+holder+" at "+clientMachine);
|
|
@@ -340,7 +341,8 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
|
|
|
// Get the array of replication targets
|
|
|
- DatanodeInfo targets[] = chooseTargets(replication, null, clientMachine);
|
|
|
+ DatanodeInfo targets[] = chooseTargets(replication, null,
|
|
|
+ clientMachine, blockSize);
|
|
|
if (targets.length < this.minReplication) {
|
|
|
throw new IOException("failed to create file "+src
|
|
|
+" on client " + clientMachine
|
|
@@ -351,6 +353,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// Reserve space for this pending file
|
|
|
pendingCreates.put(src,
|
|
|
new FileUnderConstruction(replication,
|
|
|
+ blockSize,
|
|
|
holder,
|
|
|
clientMachine));
|
|
|
NameNode.stateChangeLog.finer( "DIR* NameSystem.startFile: "
|
|
@@ -421,7 +424,7 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
// Get the array of replication targets
|
|
|
DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(),
|
|
|
- null, pendingFile.getClientMachine());
|
|
|
+ null, pendingFile.getClientMachine(), pendingFile.getBlockSize());
|
|
|
if (targets.length < this.minReplication) {
|
|
|
throw new IOException("File " + src + " could only be replicated to " +
|
|
|
targets.length + " nodes, instead of " +
|
|
@@ -1400,6 +1403,7 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
|
|
|
Block block = (Block) it.next();
|
|
|
+ long blockSize = block.getNumBytes();
|
|
|
FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
if( fileINode == null ) { // block does not belong to any file
|
|
|
it.remove();
|
|
@@ -1411,7 +1415,13 @@ class FSNamesystem implements FSConstants {
|
|
|
if (containingNodes.contains(srcNode)
|
|
|
&& ( excessBlocks == null
|
|
|
|| ! excessBlocks.contains(block))) {
|
|
|
- DatanodeInfo targets[] = chooseTargets(Math.min(fileINode.getReplication() - containingNodes.size(), this.maxReplicationStreams - xmitsInProgress), containingNodes, null);
|
|
|
+ DatanodeInfo targets[] =
|
|
|
+ chooseTargets(Math.min(fileINode.getReplication()
|
|
|
+ - containingNodes.size(),
|
|
|
+ maxReplicationStreams
|
|
|
+ - xmitsInProgress),
|
|
|
+ containingNodes, null,
|
|
|
+ blockSize);
|
|
|
if (targets.length > 0) {
|
|
|
// Build items to return
|
|
|
replicateBlocks.add(block);
|
|
@@ -1481,7 +1491,8 @@ class FSNamesystem implements FSConstants {
|
|
|
* considered targets.
|
|
|
* @return array of DatanodeInfo instances uses as targets.
|
|
|
*/
|
|
|
- DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, UTF8 clientMachine) {
|
|
|
+ DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes,
|
|
|
+ UTF8 clientMachine, long blockSize) {
|
|
|
if (desiredReplicates > datanodeMap.size()) {
|
|
|
LOG.warning("Replication requested of "+desiredReplicates
|
|
|
+" is larger than cluster size ("+datanodeMap.size()
|
|
@@ -1493,7 +1504,8 @@ class FSNamesystem implements FSConstants {
|
|
|
Vector targets = new Vector();
|
|
|
|
|
|
for (int i = 0; i < desiredReplicates; i++) {
|
|
|
- DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen, clientMachine);
|
|
|
+ DatanodeInfo target = chooseTarget(forbiddenNodes, alreadyChosen,
|
|
|
+ clientMachine, blockSize);
|
|
|
if (target == null)
|
|
|
break; // calling chooseTarget again won't help
|
|
|
targets.add(target);
|
|
@@ -1514,7 +1526,8 @@ class FSNamesystem implements FSConstants {
|
|
|
* @return DatanodeInfo instance to use or null if something went wrong
|
|
|
* (a log message is emitted if null is returned).
|
|
|
*/
|
|
|
- DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2, UTF8 clientMachine) {
|
|
|
+ DatanodeInfo chooseTarget(TreeSet forbidden1, TreeSet forbidden2,
|
|
|
+ UTF8 clientMachine, long blockSize) {
|
|
|
//
|
|
|
// Check if there are any available targets at all
|
|
|
//
|
|
@@ -1565,7 +1578,7 @@ class FSNamesystem implements FSConstants {
|
|
|
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
|
|
|
DatanodeInfo node = (DatanodeInfo) it.next();
|
|
|
if (clientMachine.equals(node.getHost())) {
|
|
|
- if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {
|
|
|
+ if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) {
|
|
|
return node;
|
|
|
}
|
|
|
}
|
|
@@ -1577,7 +1590,7 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
|
|
|
DatanodeInfo node = (DatanodeInfo) it.next();
|
|
|
- if (node.getRemaining() > BLOCK_SIZE * MIN_BLOCKS_FOR_WRITE) {
|
|
|
+ if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) {
|
|
|
return node;
|
|
|
}
|
|
|
}
|
|
@@ -1589,7 +1602,7 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
|
|
|
DatanodeInfo node = (DatanodeInfo) it.next();
|
|
|
- if (node.getRemaining() > BLOCK_SIZE) {
|
|
|
+ if (node.getRemaining() > blockSize) {
|
|
|
return node;
|
|
|
}
|
|
|
}
|
|
@@ -1615,14 +1628,17 @@ class FSNamesystem implements FSConstants {
|
|
|
*/
|
|
|
private class FileUnderConstruction {
|
|
|
private short blockReplication; // file replication
|
|
|
+ private long blockSize;
|
|
|
private Vector blocks;
|
|
|
private UTF8 clientName; // lease holder
|
|
|
private UTF8 clientMachine;
|
|
|
|
|
|
FileUnderConstruction(short replication,
|
|
|
+ long blockSize,
|
|
|
UTF8 clientName,
|
|
|
UTF8 clientMachine) throws IOException {
|
|
|
this.blockReplication = replication;
|
|
|
+ this.blockSize = blockSize;
|
|
|
this.blocks = new Vector();
|
|
|
this.clientName = clientName;
|
|
|
this.clientMachine = clientMachine;
|
|
@@ -1632,6 +1648,10 @@ class FSNamesystem implements FSConstants {
|
|
|
return this.blockReplication;
|
|
|
}
|
|
|
|
|
|
+ public long getBlockSize() {
|
|
|
+ return blockSize;
|
|
|
+ }
|
|
|
+
|
|
|
public Vector getBlocks() {
|
|
|
return blocks;
|
|
|
}
|