|
@@ -103,12 +103,6 @@ class FSNamesystem implements FSConstants {
|
|
|
private Map<String, Collection<Block>> excessReplicateMap =
|
|
|
new TreeMap<String, Collection<Block>>();
|
|
|
|
|
|
- //
|
|
|
- // Keeps track of files that are being created, plus the
|
|
|
- // blocks that make them up.
|
|
|
- //
|
|
|
- PendingCreates pendingCreates = new PendingCreates();
|
|
|
-
|
|
|
//
|
|
|
// Stats on overall usage
|
|
|
//
|
|
@@ -719,7 +713,15 @@ class FSNamesystem implements FSConstants {
|
|
|
throw new IOException(
|
|
|
text + " is less than the required minimum " + minReplication);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ void startFile(String src, String holder, String clientMachine,
|
|
|
+ boolean overwrite, short replication, long blockSize
|
|
|
+ ) throws IOException {
|
|
|
+ startFileInternal(src, holder, clientMachine, overwrite,
|
|
|
+ replication, blockSize);
|
|
|
+ getEditLog().logSync();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The client would like to create a new block for the indicated
|
|
|
* filename. Return an array that consists of the block, plus a set
|
|
@@ -731,7 +733,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* @throws IOException if the filename is invalid
|
|
|
* {@link FSDirectory#isValidToCreate(String)}.
|
|
|
*/
|
|
|
- synchronized void startFile(String src,
|
|
|
+ synchronized void startFileInternal(String src,
|
|
|
String holder,
|
|
|
String clientMachine,
|
|
|
boolean overwrite,
|
|
@@ -746,10 +748,11 @@ class FSNamesystem implements FSConstants {
|
|
|
throw new IOException("Invalid file name: " + src);
|
|
|
}
|
|
|
try {
|
|
|
- FileUnderConstruction pendingFile = pendingCreates.get(src);
|
|
|
- if (pendingFile != null) {
|
|
|
+ INode myFile = dir.getFileINode(src);
|
|
|
+ if (myFile != null && (myFile instanceof INodeFileUnderConstruction)) {
|
|
|
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
|
|
|
//
|
|
|
- // If the file exists in pendingCreate, then it must be in our
|
|
|
+ // If the file is under construction , then it must be in our
|
|
|
// leases. Find the appropriate lease record.
|
|
|
//
|
|
|
Lease lease = getLease(holder);
|
|
@@ -814,15 +817,6 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor clientNode =
|
|
|
host2DataNodeMap.getDatanodeByHost(clientMachine);
|
|
|
|
|
|
- // Reserve space for this pending file
|
|
|
- pendingCreates.put(src,
|
|
|
- new FileUnderConstruction(replication,
|
|
|
- blockSize,
|
|
|
- holder,
|
|
|
- clientMachine,
|
|
|
- clientNode));
|
|
|
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
|
|
|
- +"add "+src+" to pendingCreates for "+holder);
|
|
|
synchronized (leases) {
|
|
|
Lease lease = getLease(holder);
|
|
|
if (lease == null) {
|
|
@@ -836,20 +830,27 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
lease.startedCreate(src);
|
|
|
}
|
|
|
+
|
|
|
+ //
|
|
|
+ // Now we can add the name to the filesystem. This file has no
|
|
|
+ // blocks associated with it.
|
|
|
+ //
|
|
|
+ INode newNode = dir.addFile(src, replication, blockSize,
|
|
|
+ holder,
|
|
|
+ clientMachine,
|
|
|
+ clientNode);
|
|
|
+ if (newNode == null) {
|
|
|
+ throw new IOException("DIR* NameSystem.startFile: " +
|
|
|
+ "Unable to add file to namespace.");
|
|
|
+ }
|
|
|
} catch (IOException ie) {
|
|
|
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
|
|
|
+ie.getMessage());
|
|
|
throw ie;
|
|
|
}
|
|
|
|
|
|
- //
|
|
|
- // Now we can add the name to the filesystem. This file has no
|
|
|
- // blocks associated with it.
|
|
|
- //
|
|
|
- if (!dir.addFile(src, new Block[0], replication, blockSize)) {
|
|
|
- throw new IOException("DIR* NameSystem.startFile: " +
|
|
|
- "Unable to add file to namespace.");
|
|
|
- }
|
|
|
+ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
|
|
|
+ +"add "+src+" to namespace for "+holder);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -882,7 +883,7 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
// make sure that we still have the lease on this file
|
|
|
//
|
|
|
- FileUnderConstruction pendingFile = pendingCreates.get(src);
|
|
|
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
|
|
|
if (pendingFile == null) {
|
|
|
throw new LeaseExpiredException("No lease on " + src);
|
|
|
}
|
|
@@ -898,11 +899,11 @@ class FSNamesystem implements FSConstants {
|
|
|
if (!checkFileProgress(pendingFile, false)) {
|
|
|
throw new NotReplicatedYetException("Not replicated yet:" + src);
|
|
|
}
|
|
|
- fileLength = pendingFile.computeFileLength();
|
|
|
- blockSize = pendingFile.getBlockSize();
|
|
|
+ fileLength = pendingFile.computeContentsLength();
|
|
|
+ blockSize = pendingFile.getPreferredBlockSize();
|
|
|
clientNode = pendingFile.getClientNode();
|
|
|
replication = (int)pendingFile.getReplication();
|
|
|
- newBlock = allocateBlock(src);
|
|
|
+ newBlock = allocateBlock(src, pendingFile);
|
|
|
}
|
|
|
|
|
|
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
|
|
@@ -928,13 +929,14 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
|
|
+b.getBlockName()+"of file "+src);
|
|
|
- boolean status = pendingCreates.removeBlock(src, b);
|
|
|
- if (status) {
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
|
|
+ INode file = dir.getFileINode(src);
|
|
|
+ if (file != null) {
|
|
|
+ dir.removeBlock(src, file, b);
|
|
|
+ }
|
|
|
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
|
|
+ b.getBlockName()
|
|
|
+ " is removed from pendingCreates");
|
|
|
- }
|
|
|
- return status;
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -964,8 +966,7 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Finalize the created file and make it world-accessible. The
|
|
|
- * FSNamesystem will already know the blocks that make up the file.
|
|
|
+ * The FSNamesystem will already know the blocks that make up the file.
|
|
|
* Before we return, we make sure that all the file's blocks have
|
|
|
* been reported by datanodes and are replicated correctly.
|
|
|
*/
|
|
@@ -980,11 +981,11 @@ class FSNamesystem implements FSConstants {
|
|
|
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
|
|
|
if (isInSafeMode())
|
|
|
throw new SafeModeException("Cannot complete file " + src, safeMode);
|
|
|
- FileUnderConstruction pendingFile = pendingCreates.get(src);
|
|
|
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
|
|
|
|
|
|
Block[] fileBlocks = dir.getFileBlocks(src);
|
|
|
- if ((fileBlocks != null && fileBlocks.length > 0) ||
|
|
|
- pendingFile == null) {
|
|
|
+ if (fileBlocks == null || fileBlocks.length == 0 ||
|
|
|
+ pendingFile == null) {
|
|
|
NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
|
|
|
+ "failed to complete " + src
|
|
|
+ " because dir.getFileBlocks() is " +
|
|
@@ -999,36 +1000,16 @@ class FSNamesystem implements FSConstants {
|
|
|
return STILL_WAITING;
|
|
|
}
|
|
|
|
|
|
- Collection<Block> blocks = pendingFile.getBlocks();
|
|
|
- int nrBlocks = blocks.size();
|
|
|
- Block pendingBlocks[] = new Block[nrBlocks];
|
|
|
+ // The file is no longer pending.
|
|
|
+ // Create permanent INode, update blockmap
|
|
|
+ INodeFile newFile = pendingFile.convertToInodeFile();
|
|
|
+ dir.replaceNode(src, pendingFile, newFile);
|
|
|
|
|
|
- //
|
|
|
- // We have the pending blocks, but they won't have
|
|
|
- // length info in them (as they were allocated before
|
|
|
- // data-write took place). Find the block stored in
|
|
|
- // node descriptor.
|
|
|
- //
|
|
|
- int idx = 0;
|
|
|
- for (Block b : blocks) {
|
|
|
- Block storedBlock = blocksMap.getStoredBlock(b);
|
|
|
- // according to checkFileProgress() every block is present & replicated
|
|
|
- assert storedBlock != null : "Missing block " + b.getBlockName();
|
|
|
- pendingBlocks[idx++] = storedBlock;
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // add blocks to the file
|
|
|
- //
|
|
|
- if (!dir.addBlocks(src, pendingBlocks)) {
|
|
|
- return OPERATION_FAILED;
|
|
|
- }
|
|
|
+ // persist block allocations for this file
|
|
|
+ dir.persistBlocks(src, newFile);
|
|
|
|
|
|
- // The file is no longer pending
|
|
|
- pendingCreates.remove(src);
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "DIR* NameSystem.completeFile: " + src
|
|
|
- + " is removed from pendingCreates");
|
|
|
+ NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
|
|
|
+ + " blocklist persisted");
|
|
|
|
|
|
synchronized (leases) {
|
|
|
Lease lease = getLease(holder);
|
|
@@ -1051,6 +1032,8 @@ class FSNamesystem implements FSConstants {
|
|
|
// Now that the file is real, we need to be sure to replicate
|
|
|
// the blocks.
|
|
|
int numExpectedReplicas = pendingFile.getReplication();
|
|
|
+ Block[] pendingBlocks = pendingFile.getBlocks();
|
|
|
+ int nrBlocks = pendingBlocks.length;
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
NumberReplicas number = countNodes(pendingBlocks[i]);
|
|
@@ -1069,15 +1052,14 @@ class FSNamesystem implements FSConstants {
|
|
|
/**
|
|
|
* Allocate a block at the given pending filename
|
|
|
*/
|
|
|
- private Block allocateBlock(String src) throws IOException {
|
|
|
+ private Block allocateBlock(String src, INode file) throws IOException {
|
|
|
Block b = null;
|
|
|
do {
|
|
|
b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
|
|
|
} while (isValidBlock(b));
|
|
|
- pendingCreates.addBlock(src, b);
|
|
|
+ b = dir.addBlock(src, file, b);
|
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
|
|
|
- +src+ ". "+b.getBlockName()+
|
|
|
- " is created and added to pendingCreates and pendingCreateBlocks");
|
|
|
+ +src+ ". "+b.getBlockName());
|
|
|
return b;
|
|
|
}
|
|
|
|
|
@@ -1086,13 +1068,13 @@ class FSNamesystem implements FSConstants {
|
|
|
* replicated. If not, return false. If checkall is true, then check
|
|
|
* all blocks, otherwise check only penultimate block.
|
|
|
*/
|
|
|
- synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
|
|
|
+ synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
|
|
|
if (checkall) {
|
|
|
//
|
|
|
// check all blocks of the file.
|
|
|
//
|
|
|
- for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext();) {
|
|
|
- if (blocksMap.numNodes(it.next()) < this.minReplication) {
|
|
|
+ for (Block block: v.getBlocks()) {
|
|
|
+ if (blocksMap.numNodes(block) < this.minReplication) {
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
@@ -1490,21 +1472,36 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Release a pending file creation lock.
|
|
|
+ * Move a file that is being written to be immutable.
|
|
|
* @param src The filename
|
|
|
* @param holder The datanode that was creating the file
|
|
|
*/
|
|
|
private void internalReleaseCreate(String src, String holder) throws IOException {
|
|
|
- boolean status = pendingCreates.remove(src);
|
|
|
- if (status) {
|
|
|
- NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + src
|
|
|
- + " is removed from pendingCreates for "
|
|
|
- + holder + " (failure)");
|
|
|
- } else {
|
|
|
- NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
|
|
|
- + "attempt to release a create lock on "+ src
|
|
|
- + " that was not in pedingCreates");
|
|
|
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
|
|
|
+
|
|
|
+ // The last block that was allocated migth not have been used by the
|
|
|
+ // client. In this case, the size of the last block would be 0. A fsck
|
|
|
+ // will report this block as a missing block because no datanodes have it.
|
|
|
+ // Delete this block.
|
|
|
+ Block[] blocks = pendingFile.getBlocks();
|
|
|
+ if (blocks != null && blocks.length > 1) {
|
|
|
+ Block last = blocks[blocks.length - 1];
|
|
|
+ if (last.getNumBytes() == 0) {
|
|
|
+ pendingFile.removeBlock(last);
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ // The file is no longer pending.
|
|
|
+ // Create permanent INode, update blockmap
|
|
|
+ INodeFile newFile = pendingFile.convertToInodeFile();
|
|
|
+ dir.replaceNode(src, pendingFile, newFile);
|
|
|
+
|
|
|
+ // persist block allocations for this file
|
|
|
+ dir.persistBlocks(src, newFile);
|
|
|
+
|
|
|
+ NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " +
|
|
|
+ src + " is no longer written to by " +
|
|
|
+ holder);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2161,7 +2158,18 @@ class FSNamesystem implements FSConstants {
|
|
|
Block storedBlock = blocksMap.getStoredBlock(block); //extra look up!
|
|
|
if (storedBlock != null && block != storedBlock) {
|
|
|
if (block.getNumBytes() > 0) {
|
|
|
- storedBlock.setNumBytes(block.getNumBytes());
|
|
|
+ long cursize = storedBlock.getNumBytes();
|
|
|
+ if (cursize == 0) {
|
|
|
+ storedBlock.setNumBytes(block.getNumBytes());
|
|
|
+ } else if (cursize != block.getNumBytes()) {
|
|
|
+ LOG.warn("Inconsistent size for block " + block +
|
|
|
+ " reported from " + node.getName() +
|
|
|
+ " current size is " + cursize +
|
|
|
+ " reported size is " + block.getNumBytes());
|
|
|
+ // Accept this block even if there is a problem with its
|
|
|
+ // size. Clients should detect data corruption because of
|
|
|
+ // CRC mismatch.
|
|
|
+ }
|
|
|
}
|
|
|
block = storedBlock;
|
|
|
}
|
|
@@ -2185,8 +2193,13 @@ class FSNamesystem implements FSConstants {
|
|
|
+ block.getBlockName() + " on " + node.getName());
|
|
|
}
|
|
|
|
|
|
- if (fileINode == null) // block does not belong to any file
|
|
|
+ //
|
|
|
+ // if file is being actively written to, then do not check
|
|
|
+ // replication-factor here. It will be checked when the file is closed.
|
|
|
+ //
|
|
|
+ if (fileINode == null || fileINode instanceof INodeFileUnderConstruction) {
|
|
|
return block;
|
|
|
+ }
|
|
|
|
|
|
// filter out containingNodes that are marked for decommission.
|
|
|
NumberReplicas num = countNodes(block);
|
|
@@ -3460,8 +3473,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* Returns whether the given block is one pointed-to by a file.
|
|
|
*/
|
|
|
private boolean isValidBlock(Block b) {
|
|
|
- return (blocksMap.getINode(b) != null ||
|
|
|
- pendingCreates.contains(b));
|
|
|
+ return (blocksMap.getINode(b) != null);
|
|
|
}
|
|
|
|
|
|
// Distributed upgrade manager
|