|
@@ -1772,16 +1772,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
* Create a new file entry in the namespace.
|
|
* Create a new file entry in the namespace.
|
|
*
|
|
*
|
|
* For description of parameters and exceptions thrown see
|
|
* For description of parameters and exceptions thrown see
|
|
- * {@link ClientProtocol#create()}
|
|
|
|
|
|
+ * {@link ClientProtocol#create()}, except it returns valid file status
|
|
|
|
+ * upon success
|
|
*/
|
|
*/
|
|
- void startFile(String src, PermissionStatus permissions, String holder,
|
|
|
|
- String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
|
|
|
|
- short replication, long blockSize) throws AccessControlException,
|
|
|
|
- SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
|
|
|
|
|
|
+ HdfsFileStatus startFile(String src, PermissionStatus permissions,
|
|
|
|
+ String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
|
|
|
+ boolean createParent, short replication, long blockSize)
|
|
|
|
+ throws AccessControlException, SafeModeException,
|
|
|
|
+ FileAlreadyExistsException, UnresolvedLinkException,
|
|
FileNotFoundException, ParentNotDirectoryException, IOException {
|
|
FileNotFoundException, ParentNotDirectoryException, IOException {
|
|
try {
|
|
try {
|
|
- startFileInt(src, permissions, holder, clientMachine, flag, createParent,
|
|
|
|
- replication, blockSize);
|
|
|
|
|
|
+ return startFileInt(src, permissions, holder, clientMachine, flag,
|
|
|
|
+ createParent, replication, blockSize);
|
|
} catch (AccessControlException e) {
|
|
} catch (AccessControlException e) {
|
|
if (isAuditEnabled() && isExternalInvocation()) {
|
|
if (isAuditEnabled() && isExternalInvocation()) {
|
|
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
|
|
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
|
|
@@ -1792,18 +1794,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void startFileInt(String src, PermissionStatus permissions, String holder,
|
|
|
|
- String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
|
|
|
|
- short replication, long blockSize) throws AccessControlException,
|
|
|
|
- SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
|
|
|
|
|
|
+ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
|
|
|
|
+ String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
|
|
|
+ boolean createParent, short replication, long blockSize)
|
|
|
|
+ throws AccessControlException, SafeModeException,
|
|
|
|
+ FileAlreadyExistsException, UnresolvedLinkException,
|
|
FileNotFoundException, ParentNotDirectoryException, IOException {
|
|
FileNotFoundException, ParentNotDirectoryException, IOException {
|
|
boolean skipSync = false;
|
|
boolean skipSync = false;
|
|
|
|
+ final HdfsFileStatus stat;
|
|
writeLock();
|
|
writeLock();
|
|
try {
|
|
try {
|
|
checkOperation(OperationCategory.WRITE);
|
|
checkOperation(OperationCategory.WRITE);
|
|
|
|
|
|
startFileInternal(src, permissions, holder, clientMachine, flag,
|
|
startFileInternal(src, permissions, holder, clientMachine, flag,
|
|
createParent, replication, blockSize);
|
|
createParent, replication, blockSize);
|
|
|
|
+ stat = dir.getFileInfo(src, false);
|
|
} catch (StandbyException se) {
|
|
} catch (StandbyException se) {
|
|
skipSync = true;
|
|
skipSync = true;
|
|
throw se;
|
|
throw se;
|
|
@@ -1817,11 +1822,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
}
|
|
}
|
|
|
|
|
|
if (isAuditEnabled() && isExternalInvocation()) {
|
|
if (isAuditEnabled() && isExternalInvocation()) {
|
|
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
|
|
|
|
logAuditEvent(UserGroupInformation.getCurrentUser(),
|
|
logAuditEvent(UserGroupInformation.getCurrentUser(),
|
|
getRemoteIp(),
|
|
getRemoteIp(),
|
|
"create", src, null, stat);
|
|
"create", src, null, stat);
|
|
}
|
|
}
|
|
|
|
+ return stat;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -2192,20 +2197,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
* are replicated. Will return an empty 2-elt array if we want the
|
|
* are replicated. Will return an empty 2-elt array if we want the
|
|
* client to "try again later".
|
|
* client to "try again later".
|
|
*/
|
|
*/
|
|
- LocatedBlock getAdditionalBlock(String src,
|
|
|
|
- String clientName,
|
|
|
|
- ExtendedBlock previous,
|
|
|
|
- HashMap<Node, Node> excludedNodes
|
|
|
|
- )
|
|
|
|
|
|
+ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
|
|
|
+ ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
|
|
throws LeaseExpiredException, NotReplicatedYetException,
|
|
throws LeaseExpiredException, NotReplicatedYetException,
|
|
QuotaExceededException, SafeModeException, UnresolvedLinkException,
|
|
QuotaExceededException, SafeModeException, UnresolvedLinkException,
|
|
IOException {
|
|
IOException {
|
|
- checkBlock(previous);
|
|
|
|
- Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
|
|
|
- long fileLength, blockSize;
|
|
|
|
|
|
+ long blockSize;
|
|
int replication;
|
|
int replication;
|
|
DatanodeDescriptor clientNode = null;
|
|
DatanodeDescriptor clientNode = null;
|
|
- Block newBlock = null;
|
|
|
|
|
|
|
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
NameNode.stateChangeLog.debug(
|
|
NameNode.stateChangeLog.debug(
|
|
@@ -2213,119 +2212,61 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
+src+" for "+clientName);
|
|
+src+" for "+clientName);
|
|
}
|
|
}
|
|
|
|
|
|
- writeLock();
|
|
|
|
|
|
+ // Part I. Analyze the state of the file with respect to the input data.
|
|
|
|
+ readLock();
|
|
try {
|
|
try {
|
|
- checkOperation(OperationCategory.WRITE);
|
|
|
|
|
|
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
|
|
+ final INode[] inodes = analyzeFileState(
|
|
|
|
+ src, fileId, clientName, previous, onRetryBlock).getINodes();
|
|
|
|
+ final INodeFileUnderConstruction pendingFile =
|
|
|
|
+ (INodeFileUnderConstruction) inodes[inodes.length - 1];
|
|
|
|
|
|
- if (isInSafeMode()) {
|
|
|
|
- throw new SafeModeException("Cannot add block to " + src, safeMode);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // have we exceeded the configured limit of fs objects.
|
|
|
|
- checkFsObjectLimit();
|
|
|
|
-
|
|
|
|
- INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
|
|
|
- BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
|
|
|
- if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
|
|
|
- // The block that the client claims is the current last block
|
|
|
|
- // doesn't match up with what we think is the last block. There are
|
|
|
|
- // three possibilities:
|
|
|
|
- // 1) This is the first block allocation of an append() pipeline
|
|
|
|
- // which started appending exactly at a block boundary.
|
|
|
|
- // In this case, the client isn't passed the previous block,
|
|
|
|
- // so it makes the allocateBlock() call with previous=null.
|
|
|
|
- // We can distinguish this since the last block of the file
|
|
|
|
- // will be exactly a full block.
|
|
|
|
- // 2) This is a retry from a client that missed the response of a
|
|
|
|
- // prior getAdditionalBlock() call, perhaps because of a network
|
|
|
|
- // timeout, or because of an HA failover. In that case, we know
|
|
|
|
- // by the fact that the client is re-issuing the RPC that it
|
|
|
|
- // never began to write to the old block. Hence it is safe to
|
|
|
|
- // abandon it and allocate a new one.
|
|
|
|
- // 3) This is an entirely bogus request/bug -- we should error out
|
|
|
|
- // rather than potentially appending a new block with an empty
|
|
|
|
- // one in the middle, etc
|
|
|
|
-
|
|
|
|
- BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
|
|
|
|
- if (previous == null &&
|
|
|
|
- lastBlockInFile != null &&
|
|
|
|
- lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
|
|
|
|
- lastBlockInFile.isComplete()) {
|
|
|
|
- // Case 1
|
|
|
|
- if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
|
- "BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
|
|
|
- " writing to a file with a complete previous block: src=" +
|
|
|
|
- src + " lastBlock=" + lastBlockInFile);
|
|
|
|
- }
|
|
|
|
- } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
|
|
|
- // Case 2
|
|
|
|
- if (lastBlockInFile.getNumBytes() != 0) {
|
|
|
|
- throw new IOException(
|
|
|
|
- "Request looked like a retry to allocate block " +
|
|
|
|
- lastBlockInFile + " but it already contains " +
|
|
|
|
- lastBlockInFile.getNumBytes() + " bytes");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // The retry case ("b" above) -- abandon the old block.
|
|
|
|
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
|
|
|
|
- "caught retry for allocation of a new block in " +
|
|
|
|
- src + ". Abandoning old block " + lastBlockInFile);
|
|
|
|
- dir.removeBlock(src, pendingFile, lastBlockInFile);
|
|
|
|
- dir.persistBlocks(src, pendingFile);
|
|
|
|
- } else {
|
|
|
|
-
|
|
|
|
- throw new IOException("Cannot allocate block in " + src + ": " +
|
|
|
|
- "passed 'previous' block " + previous + " does not match actual " +
|
|
|
|
- "last block in file " + lastBlockInFile);
|
|
|
|
- }
|
|
|
|
|
|
+ if(onRetryBlock[0] != null) {
|
|
|
|
+ // This is a retry. Just return the last block.
|
|
|
|
+ return onRetryBlock[0];
|
|
}
|
|
}
|
|
|
|
|
|
- // commit the last block and complete it if it has minimum replicas
|
|
|
|
- commitOrCompleteLastBlock(pendingFile, previousBlock);
|
|
|
|
-
|
|
|
|
- //
|
|
|
|
- // If we fail this, bad things happen!
|
|
|
|
- //
|
|
|
|
- if (!checkFileProgress(pendingFile, false)) {
|
|
|
|
- throw new NotReplicatedYetException("Not replicated yet:" + src);
|
|
|
|
- }
|
|
|
|
- fileLength = pendingFile.computeContentSummary().getLength();
|
|
|
|
blockSize = pendingFile.getPreferredBlockSize();
|
|
blockSize = pendingFile.getPreferredBlockSize();
|
|
clientNode = pendingFile.getClientNode();
|
|
clientNode = pendingFile.getClientNode();
|
|
replication = pendingFile.getBlockReplication();
|
|
replication = pendingFile.getBlockReplication();
|
|
} finally {
|
|
} finally {
|
|
- writeUnlock();
|
|
|
|
|
|
+ readUnlock();
|
|
}
|
|
}
|
|
|
|
|
|
// choose targets for the new block to be allocated.
|
|
// choose targets for the new block to be allocated.
|
|
- final DatanodeDescriptor targets[] = blockManager.chooseTarget(
|
|
|
|
|
|
+ final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
|
|
src, replication, clientNode, excludedNodes, blockSize);
|
|
src, replication, clientNode, excludedNodes, blockSize);
|
|
|
|
|
|
- // Allocate a new block and record it in the INode.
|
|
|
|
|
|
+ // Part II.
|
|
|
|
+ // Allocate a new block, add it to the INode and the BlocksMap.
|
|
|
|
+ Block newBlock = null;
|
|
|
|
+ long offset;
|
|
writeLock();
|
|
writeLock();
|
|
try {
|
|
try {
|
|
- checkOperation(OperationCategory.WRITE);
|
|
|
|
- if (isInSafeMode()) {
|
|
|
|
- throw new SafeModeException("Cannot add block to " + src, safeMode);
|
|
|
|
- }
|
|
|
|
|
|
+ // Run the full analysis again, since things could have changed
|
|
|
|
+ // while chooseTarget() was executing.
|
|
|
|
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
|
|
+ INodesInPath inodesInPath =
|
|
|
|
+ analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
|
|
|
|
+ INode[] inodes = inodesInPath.getINodes();
|
|
|
|
+ final INodeFileUnderConstruction pendingFile =
|
|
|
|
+ (INodeFileUnderConstruction) inodes[inodes.length - 1];
|
|
|
|
|
|
- final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
|
|
|
|
- final INode[] inodes = inodesInPath.getINodes();
|
|
|
|
- final INodeFileUnderConstruction pendingFile
|
|
|
|
- = checkLease(src, clientName, inodes[inodes.length - 1]);
|
|
|
|
-
|
|
|
|
- if (!checkFileProgress(pendingFile, false)) {
|
|
|
|
- throw new NotReplicatedYetException("Not replicated yet:" + src);
|
|
|
|
|
|
+ if(onRetryBlock[0] != null) {
|
|
|
|
+ // This is a retry. Just return the last block.
|
|
|
|
+ return onRetryBlock[0];
|
|
}
|
|
}
|
|
|
|
|
|
- // allocate new block record block locations in INode.
|
|
|
|
- newBlock = allocateBlock(src, inodesInPath, targets);
|
|
|
|
-
|
|
|
|
- for (DatanodeDescriptor dn : targets) {
|
|
|
|
- dn.incBlocksScheduled();
|
|
|
|
- }
|
|
|
|
|
|
+ // commit the last block and complete it if it has minimum replicas
|
|
|
|
+ commitOrCompleteLastBlock(pendingFile,
|
|
|
|
+ ExtendedBlock.getLocalBlock(previous));
|
|
|
|
+
|
|
|
|
+ // allocate new block, record block locations in INode.
|
|
|
|
+ newBlock = createNewBlock();
|
|
|
|
+ saveAllocatedBlock(src, inodesInPath, newBlock, targets);
|
|
|
|
+
|
|
dir.persistBlocks(src, pendingFile);
|
|
dir.persistBlocks(src, pendingFile);
|
|
|
|
+ offset = pendingFile.computeFileSize(true);
|
|
} finally {
|
|
} finally {
|
|
writeUnlock();
|
|
writeUnlock();
|
|
}
|
|
}
|
|
@@ -2333,10 +2274,115 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
getEditLog().logSync();
|
|
getEditLog().logSync();
|
|
}
|
|
}
|
|
|
|
|
|
- // Create next block
|
|
|
|
- LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
|
|
|
|
- blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
|
|
|
|
- return b;
|
|
|
|
|
|
+ // Return located block
|
|
|
|
+ return makeLocatedBlock(newBlock, targets, offset);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ INodesInPath analyzeFileState(String src,
|
|
|
|
+ long fileId,
|
|
|
|
+ String clientName,
|
|
|
|
+ ExtendedBlock previous,
|
|
|
|
+ LocatedBlock[] onRetryBlock)
|
|
|
|
+ throws IOException {
|
|
|
|
+ assert hasReadOrWriteLock();
|
|
|
|
+
|
|
|
|
+ checkBlock(previous);
|
|
|
|
+ onRetryBlock[0] = null;
|
|
|
|
+ checkOperation(OperationCategory.WRITE);
|
|
|
|
+ if (isInSafeMode()) {
|
|
|
|
+ throw new SafeModeException("Cannot add block to " + src, safeMode);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // have we exceeded the configured limit of fs objects.
|
|
|
|
+ checkFsObjectLimit();
|
|
|
|
+
|
|
|
|
+ Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
|
|
|
+ final INodesInPath inodesInPath =
|
|
|
|
+ dir.rootDir.getExistingPathINodes(src, true);
|
|
|
|
+ final INode[] inodes = inodesInPath.getINodes();
|
|
|
|
+ final INodeFileUnderConstruction pendingFile
|
|
|
|
+ = checkLease(src, fileId, clientName, inodes[inodes.length - 1]);
|
|
|
|
+ BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
|
|
|
+ if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
|
|
|
+ // The block that the client claims is the current last block
|
|
|
|
+ // doesn't match up with what we think is the last block. There are
|
|
|
|
+ // four possibilities:
|
|
|
|
+ // 1) This is the first block allocation of an append() pipeline
|
|
|
|
+ // which started appending exactly at a block boundary.
|
|
|
|
+ // In this case, the client isn't passed the previous block,
|
|
|
|
+ // so it makes the allocateBlock() call with previous=null.
|
|
|
|
+ // We can distinguish this since the last block of the file
|
|
|
|
+ // will be exactly a full block.
|
|
|
|
+ // 2) This is a retry from a client that missed the response of a
|
|
|
|
+ // prior getAdditionalBlock() call, perhaps because of a network
|
|
|
|
+ // timeout, or because of an HA failover. In that case, we know
|
|
|
|
+ // by the fact that the client is re-issuing the RPC that it
|
|
|
|
+ // never began to write to the old block. Hence it is safe to
|
|
|
|
+ // to return the existing block.
|
|
|
|
+ // 3) This is an entirely bogus request/bug -- we should error out
|
|
|
|
+ // rather than potentially appending a new block with an empty
|
|
|
|
+ // one in the middle, etc
|
|
|
|
+ // 4) This is a retry from a client that timed out while
|
|
|
|
+ // the prior getAdditionalBlock() is still being processed,
|
|
|
|
+ // currently working on chooseTarget().
|
|
|
|
+ // There are no means to distinguish between the first and
|
|
|
|
+ // the second attempts in Part I, because the first one hasn't
|
|
|
|
+ // changed the namesystem state yet.
|
|
|
|
+ // We run this analysis again in Part II where case 4 is impossible.
|
|
|
|
+
|
|
|
|
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
|
|
|
|
+ if (previous == null &&
|
|
|
|
+ lastBlockInFile != null &&
|
|
|
|
+ lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
|
|
|
|
+ lastBlockInFile.isComplete()) {
|
|
|
|
+ // Case 1
|
|
|
|
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
|
+ "BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
|
|
|
+ " writing to a file with a complete previous block: src=" +
|
|
|
|
+ src + " lastBlock=" + lastBlockInFile);
|
|
|
|
+ }
|
|
|
|
+ } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
|
|
|
+ if (lastBlockInFile.getNumBytes() != 0) {
|
|
|
|
+ throw new IOException(
|
|
|
|
+ "Request looked like a retry to allocate block " +
|
|
|
|
+ lastBlockInFile + " but it already contains " +
|
|
|
|
+ lastBlockInFile.getNumBytes() + " bytes");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Case 2
|
|
|
|
+ // Return the last block.
|
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
|
|
|
|
+ "caught retry for allocation of a new block in " +
|
|
|
|
+ src + ". Returning previously allocated block " + lastBlockInFile);
|
|
|
|
+ long offset = pendingFile.computeFileSize(true);
|
|
|
|
+ onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
|
|
|
|
+ ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
|
|
|
|
+ offset);
|
|
|
|
+ return inodesInPath;
|
|
|
|
+ } else {
|
|
|
|
+ // Case 3
|
|
|
|
+ throw new IOException("Cannot allocate block in " + src + ": " +
|
|
|
|
+ "passed 'previous' block " + previous + " does not match actual " +
|
|
|
|
+ "last block in file " + lastBlockInFile);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Check if the penultimate block is minimally replicated
|
|
|
|
+ if (!checkFileProgress(pendingFile, false)) {
|
|
|
|
+ throw new NotReplicatedYetException("Not replicated yet: " + src);
|
|
|
|
+ }
|
|
|
|
+ return inodesInPath;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ LocatedBlock makeLocatedBlock(Block blk,
|
|
|
|
+ DatanodeInfo[] locs,
|
|
|
|
+ long offset) throws IOException {
|
|
|
|
+ LocatedBlock lBlk = new LocatedBlock(
|
|
|
|
+ getExtendedBlock(blk), locs, offset);
|
|
|
|
+ getBlockManager().setBlockToken(
|
|
|
|
+ lBlk, BlockTokenSecretManager.AccessMode.WRITE);
|
|
|
|
+ return lBlk;
|
|
}
|
|
}
|
|
|
|
|
|
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
|
|
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
|
|
@@ -2424,14 +2470,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
}
|
|
}
|
|
|
|
|
|
// make sure that we still have the lease on this file.
|
|
// make sure that we still have the lease on this file.
|
|
- private INodeFileUnderConstruction checkLease(String src, String holder)
|
|
|
|
- throws LeaseExpiredException, UnresolvedLinkException {
|
|
|
|
|
|
+ private INodeFileUnderConstruction checkLease(String src, String holder)
|
|
|
|
+ throws LeaseExpiredException, UnresolvedLinkException,
|
|
|
|
+ FileNotFoundException {
|
|
assert hasReadOrWriteLock();
|
|
assert hasReadOrWriteLock();
|
|
- return checkLease(src, holder, dir.getINode(src));
|
|
|
|
|
|
+ return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
|
|
|
|
+ dir.getINode(src));
|
|
}
|
|
}
|
|
-
|
|
|
|
- private INodeFileUnderConstruction checkLease(String src, String holder,
|
|
|
|
- INode file) throws LeaseExpiredException {
|
|
|
|
|
|
+
|
|
|
|
+ private INodeFileUnderConstruction checkLease(String src, long fileId,
|
|
|
|
+ String holder, INode file) throws LeaseExpiredException,
|
|
|
|
+ FileNotFoundException {
|
|
assert hasReadOrWriteLock();
|
|
assert hasReadOrWriteLock();
|
|
if (file == null || !(file instanceof INodeFile)) {
|
|
if (file == null || !(file instanceof INodeFile)) {
|
|
Lease lease = leaseManager.getLease(holder);
|
|
Lease lease = leaseManager.getLease(holder);
|
|
@@ -2452,6 +2501,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
|
|
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
|
|
+ pendingFile.getClientName() + " but is accessed by " + holder);
|
|
+ pendingFile.getClientName() + " but is accessed by " + holder);
|
|
}
|
|
}
|
|
|
|
+ INodeId.checkId(fileId, pendingFile);
|
|
return pendingFile;
|
|
return pendingFile;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2528,22 +2578,33 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Allocate a block at the given pending filename
|
|
|
|
|
|
+ * Save allocated block at the given pending filename
|
|
*
|
|
*
|
|
* @param src path to the file
|
|
* @param src path to the file
|
|
* @param inodesInPath representing each of the components of src.
|
|
* @param inodesInPath representing each of the components of src.
|
|
* The last INode is the INode for the file.
|
|
* The last INode is the INode for the file.
|
|
* @throws QuotaExceededException If addition of block exceeds space quota
|
|
* @throws QuotaExceededException If addition of block exceeds space quota
|
|
*/
|
|
*/
|
|
- private Block allocateBlock(String src, INodesInPath inodesInPath,
|
|
|
|
- DatanodeDescriptor targets[]) throws IOException {
|
|
|
|
|
|
+ BlockInfo saveAllocatedBlock(String src, INodesInPath inodesInPath,
|
|
|
|
+ Block newBlock, DatanodeDescriptor targets[]) throws IOException {
|
|
|
|
+ assert hasWriteLock();
|
|
|
|
+ BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets);
|
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
|
|
|
|
+ + getBlockPoolId() + " " + b);
|
|
|
|
+ for (DatanodeDescriptor dn : targets) {
|
|
|
|
+ dn.incBlocksScheduled();
|
|
|
|
+ }
|
|
|
|
+ return b;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create new block with a unique block id and a new generation stamp.
|
|
|
|
+ */
|
|
|
|
+ Block createNewBlock() throws IOException {
|
|
assert hasWriteLock();
|
|
assert hasWriteLock();
|
|
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0);
|
|
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0);
|
|
// Increment the generation stamp for every new block.
|
|
// Increment the generation stamp for every new block.
|
|
b.setGenerationStamp(nextGenerationStamp());
|
|
b.setGenerationStamp(nextGenerationStamp());
|
|
- b = dir.addBlock(src, inodesInPath, b, targets);
|
|
|
|
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
|
|
|
|
- + blockPoolId + " " + b);
|
|
|
|
return b;
|
|
return b;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -5582,7 +5643,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
@Override
|
|
@Override
|
|
public boolean isAvoidingStaleDataNodesForWrite() {
|
|
public boolean isAvoidingStaleDataNodesForWrite() {
|
|
return this.blockManager.getDatanodeManager()
|
|
return this.blockManager.getDatanodeManager()
|
|
- .isAvoidingStaleDataNodesForWrite();
|
|
|
|
|
|
+ .shouldAvoidStaleDataNodesForWrite();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|