|
@@ -127,6 +127,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
|
import org.apache.hadoop.ha.ServiceFailedException;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.HAUtil;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
@@ -422,51 +423,73 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
-
|
|
|
- /**
|
|
|
- * Instantiates an FSNamesystem loaded from the image and edits
|
|
|
- * directories specified in the passed Configuration.
|
|
|
- *
|
|
|
- * @param conf the Configuration which specifies the storage directories
|
|
|
- * from which to load
|
|
|
- * @return an FSNamesystem which contains the loaded namespace
|
|
|
- * @throws IOException if loading fails
|
|
|
+ * Check the supplied configuration for correctness.
|
|
|
+ * @param conf Supplies the configuration to validate.
|
|
|
+ * @throws IOException if the configuration could not be queried.
|
|
|
+ * @throws IllegalArgumentException if the configuration is invalid.
|
|
|
*/
|
|
|
- public static FSNamesystem loadFromDisk(Configuration conf)
|
|
|
+ private static void checkConfiguration(Configuration conf)
|
|
|
throws IOException {
|
|
|
- Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
|
|
|
- List<URI> namespaceEditsDirs =
|
|
|
- FSNamesystem.getNamespaceEditsDirs(conf);
|
|
|
- return loadFromDisk(conf, namespaceDirs, namespaceEditsDirs);
|
|
|
- }
|
|
|
|
|
|
- /**
|
|
|
- * Instantiates an FSNamesystem loaded from the image and edits
|
|
|
- * directories passed.
|
|
|
- *
|
|
|
- * @param conf the Configuration which specifies the storage directories
|
|
|
- * from which to load
|
|
|
- * @param namespaceDirs directories to load the fsimages
|
|
|
- * @param namespaceEditsDirs directories to load the edits from
|
|
|
- * @return an FSNamesystem which contains the loaded namespace
|
|
|
- * @throws IOException if loading fails
|
|
|
- */
|
|
|
- public static FSNamesystem loadFromDisk(Configuration conf,
|
|
|
- Collection<URI> namespaceDirs, List<URI> namespaceEditsDirs)
|
|
|
- throws IOException {
|
|
|
+ final Collection<URI> namespaceDirs =
|
|
|
+ FSNamesystem.getNamespaceDirs(conf);
|
|
|
+ final Collection<URI> editsDirs =
|
|
|
+ FSNamesystem.getNamespaceEditsDirs(conf);
|
|
|
+ final Collection<URI> requiredEditsDirs =
|
|
|
+ FSNamesystem.getRequiredNamespaceEditsDirs(conf);
|
|
|
+ final Collection<URI> sharedEditsDirs =
|
|
|
+ FSNamesystem.getSharedEditsDirs(conf);
|
|
|
+
|
|
|
+ for (URI u : requiredEditsDirs) {
|
|
|
+ if (u.toString().compareTo(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_DEFAULT) == 0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Each required directory must also be in editsDirs or in
|
|
|
+ // sharedEditsDirs.
|
|
|
+ if (!editsDirs.contains(u) &&
|
|
|
+ !sharedEditsDirs.contains(u)) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Required edits directory " + u.toString() + " not present in " +
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY + ". " +
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY + "=" +
|
|
|
+ editsDirs.toString() + "; " +
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY + "=" +
|
|
|
+ requiredEditsDirs.toString() + ". " +
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "=" +
|
|
|
+ sharedEditsDirs.toString() + ".");
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
if (namespaceDirs.size() == 1) {
|
|
|
LOG.warn("Only one image storage directory ("
|
|
|
+ DFS_NAMENODE_NAME_DIR_KEY + ") configured. Beware of dataloss"
|
|
|
+ " due to lack of redundant storage directories!");
|
|
|
}
|
|
|
- if (namespaceEditsDirs.size() == 1) {
|
|
|
+ if (editsDirs.size() == 1) {
|
|
|
LOG.warn("Only one namespace edits storage directory ("
|
|
|
+ DFS_NAMENODE_EDITS_DIR_KEY + ") configured. Beware of dataloss"
|
|
|
+ " due to lack of redundant storage directories!");
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs);
|
|
|
+ /**
|
|
|
+ * Instantiates an FSNamesystem loaded from the image and edits
|
|
|
+ * directories specified in the passed Configuration.
|
|
|
+ *
|
|
|
+ * @param conf the Configuration which specifies the storage directories
|
|
|
+ * from which to load
|
|
|
+ * @return an FSNamesystem which contains the loaded namespace
|
|
|
+ * @throws IOException if loading fails
|
|
|
+ */
|
|
|
+ public static FSNamesystem loadFromDisk(Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ checkConfiguration(conf);
|
|
|
+ FSImage fsImage = new FSImage(conf,
|
|
|
+ FSNamesystem.getNamespaceDirs(conf),
|
|
|
+ FSNamesystem.getNamespaceEditsDirs(conf));
|
|
|
FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
|
|
|
StartupOption startOpt = NameNode.getStartupOption(conf);
|
|
|
if (startOpt == StartupOption.RECOVER) {
|
|
@@ -913,7 +936,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
"\n\t\t- use Backup Node as a persistent and up-to-date storage " +
|
|
|
"of the file system meta-data.");
|
|
|
} else if (dirNames.isEmpty()) {
|
|
|
- dirNames = Collections.singletonList("file:///tmp/hadoop/dfs/name");
|
|
|
+ dirNames = Collections.singletonList(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_DEFAULT);
|
|
|
}
|
|
|
return Util.stringCollectionAsURIs(dirNames);
|
|
|
}
|
|
@@ -1772,16 +1796,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
* Create a new file entry in the namespace.
|
|
|
*
|
|
|
* For description of parameters and exceptions thrown see
|
|
|
- * {@link ClientProtocol#create()}
|
|
|
+ * {@link ClientProtocol#create()}, except it returns valid file status
|
|
|
+ * upon success
|
|
|
*/
|
|
|
- void startFile(String src, PermissionStatus permissions, String holder,
|
|
|
- String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
|
|
|
- short replication, long blockSize) throws AccessControlException,
|
|
|
- SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
|
|
|
+ HdfsFileStatus startFile(String src, PermissionStatus permissions,
|
|
|
+ String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
|
|
+ boolean createParent, short replication, long blockSize)
|
|
|
+ throws AccessControlException, SafeModeException,
|
|
|
+ FileAlreadyExistsException, UnresolvedLinkException,
|
|
|
FileNotFoundException, ParentNotDirectoryException, IOException {
|
|
|
try {
|
|
|
- startFileInt(src, permissions, holder, clientMachine, flag, createParent,
|
|
|
- replication, blockSize);
|
|
|
+ return startFileInt(src, permissions, holder, clientMachine, flag,
|
|
|
+ createParent, replication, blockSize);
|
|
|
} catch (AccessControlException e) {
|
|
|
if (isAuditEnabled() && isExternalInvocation()) {
|
|
|
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
|
|
@@ -1792,18 +1818,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void startFileInt(String src, PermissionStatus permissions, String holder,
|
|
|
- String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
|
|
|
- short replication, long blockSize) throws AccessControlException,
|
|
|
- SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
|
|
|
+ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
|
|
|
+ String holder, String clientMachine, EnumSet<CreateFlag> flag,
|
|
|
+ boolean createParent, short replication, long blockSize)
|
|
|
+ throws AccessControlException, SafeModeException,
|
|
|
+ FileAlreadyExistsException, UnresolvedLinkException,
|
|
|
FileNotFoundException, ParentNotDirectoryException, IOException {
|
|
|
boolean skipSync = false;
|
|
|
+ final HdfsFileStatus stat;
|
|
|
writeLock();
|
|
|
try {
|
|
|
checkOperation(OperationCategory.WRITE);
|
|
|
|
|
|
startFileInternal(src, permissions, holder, clientMachine, flag,
|
|
|
createParent, replication, blockSize);
|
|
|
+ stat = dir.getFileInfo(src, false);
|
|
|
} catch (StandbyException se) {
|
|
|
skipSync = true;
|
|
|
throw se;
|
|
@@ -1817,11 +1846,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
}
|
|
|
|
|
|
if (isAuditEnabled() && isExternalInvocation()) {
|
|
|
- final HdfsFileStatus stat = dir.getFileInfo(src, false);
|
|
|
logAuditEvent(UserGroupInformation.getCurrentUser(),
|
|
|
getRemoteIp(),
|
|
|
"create", src, null, stat);
|
|
|
}
|
|
|
+ return stat;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2192,20 +2221,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
* are replicated. Will return an empty 2-elt array if we want the
|
|
|
* client to "try again later".
|
|
|
*/
|
|
|
- LocatedBlock getAdditionalBlock(String src,
|
|
|
- String clientName,
|
|
|
- ExtendedBlock previous,
|
|
|
- HashMap<Node, Node> excludedNodes
|
|
|
- )
|
|
|
+ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
|
|
|
+ ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
|
|
|
throws LeaseExpiredException, NotReplicatedYetException,
|
|
|
QuotaExceededException, SafeModeException, UnresolvedLinkException,
|
|
|
IOException {
|
|
|
- checkBlock(previous);
|
|
|
- Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
|
|
- long fileLength, blockSize;
|
|
|
+ long blockSize;
|
|
|
int replication;
|
|
|
DatanodeDescriptor clientNode = null;
|
|
|
- Block newBlock = null;
|
|
|
|
|
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
NameNode.stateChangeLog.debug(
|
|
@@ -2213,119 +2236,61 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
+src+" for "+clientName);
|
|
|
}
|
|
|
|
|
|
- writeLock();
|
|
|
+ // Part I. Analyze the state of the file with respect to the input data.
|
|
|
+ readLock();
|
|
|
try {
|
|
|
- checkOperation(OperationCategory.WRITE);
|
|
|
-
|
|
|
- if (isInSafeMode()) {
|
|
|
- throw new SafeModeException("Cannot add block to " + src, safeMode);
|
|
|
- }
|
|
|
-
|
|
|
- // have we exceeded the configured limit of fs objects.
|
|
|
- checkFsObjectLimit();
|
|
|
-
|
|
|
- INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
|
|
- BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
|
|
- if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
|
|
- // The block that the client claims is the current last block
|
|
|
- // doesn't match up with what we think is the last block. There are
|
|
|
- // three possibilities:
|
|
|
- // 1) This is the first block allocation of an append() pipeline
|
|
|
- // which started appending exactly at a block boundary.
|
|
|
- // In this case, the client isn't passed the previous block,
|
|
|
- // so it makes the allocateBlock() call with previous=null.
|
|
|
- // We can distinguish this since the last block of the file
|
|
|
- // will be exactly a full block.
|
|
|
- // 2) This is a retry from a client that missed the response of a
|
|
|
- // prior getAdditionalBlock() call, perhaps because of a network
|
|
|
- // timeout, or because of an HA failover. In that case, we know
|
|
|
- // by the fact that the client is re-issuing the RPC that it
|
|
|
- // never began to write to the old block. Hence it is safe to
|
|
|
- // abandon it and allocate a new one.
|
|
|
- // 3) This is an entirely bogus request/bug -- we should error out
|
|
|
- // rather than potentially appending a new block with an empty
|
|
|
- // one in the middle, etc
|
|
|
-
|
|
|
- BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
|
|
|
- if (previous == null &&
|
|
|
- lastBlockInFile != null &&
|
|
|
- lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
|
|
|
- lastBlockInFile.isComplete()) {
|
|
|
- // Case 1
|
|
|
- if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
|
|
- " writing to a file with a complete previous block: src=" +
|
|
|
- src + " lastBlock=" + lastBlockInFile);
|
|
|
- }
|
|
|
- } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
|
|
- // Case 2
|
|
|
- if (lastBlockInFile.getNumBytes() != 0) {
|
|
|
- throw new IOException(
|
|
|
- "Request looked like a retry to allocate block " +
|
|
|
- lastBlockInFile + " but it already contains " +
|
|
|
- lastBlockInFile.getNumBytes() + " bytes");
|
|
|
- }
|
|
|
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
|
+ final INode[] inodes = analyzeFileState(
|
|
|
+ src, fileId, clientName, previous, onRetryBlock).getINodes();
|
|
|
+ final INodeFileUnderConstruction pendingFile =
|
|
|
+ (INodeFileUnderConstruction) inodes[inodes.length - 1];
|
|
|
|
|
|
- // The retry case ("b" above) -- abandon the old block.
|
|
|
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
|
|
|
- "caught retry for allocation of a new block in " +
|
|
|
- src + ". Abandoning old block " + lastBlockInFile);
|
|
|
- dir.removeBlock(src, pendingFile, lastBlockInFile);
|
|
|
- dir.persistBlocks(src, pendingFile);
|
|
|
- } else {
|
|
|
-
|
|
|
- throw new IOException("Cannot allocate block in " + src + ": " +
|
|
|
- "passed 'previous' block " + previous + " does not match actual " +
|
|
|
- "last block in file " + lastBlockInFile);
|
|
|
- }
|
|
|
+ if(onRetryBlock[0] != null) {
|
|
|
+ // This is a retry. Just return the last block.
|
|
|
+ return onRetryBlock[0];
|
|
|
}
|
|
|
|
|
|
- // commit the last block and complete it if it has minimum replicas
|
|
|
- commitOrCompleteLastBlock(pendingFile, previousBlock);
|
|
|
-
|
|
|
- //
|
|
|
- // If we fail this, bad things happen!
|
|
|
- //
|
|
|
- if (!checkFileProgress(pendingFile, false)) {
|
|
|
- throw new NotReplicatedYetException("Not replicated yet:" + src);
|
|
|
- }
|
|
|
- fileLength = pendingFile.computeContentSummary().getLength();
|
|
|
blockSize = pendingFile.getPreferredBlockSize();
|
|
|
clientNode = pendingFile.getClientNode();
|
|
|
replication = pendingFile.getBlockReplication();
|
|
|
} finally {
|
|
|
- writeUnlock();
|
|
|
+ readUnlock();
|
|
|
}
|
|
|
|
|
|
// choose targets for the new block to be allocated.
|
|
|
- final DatanodeDescriptor targets[] = blockManager.chooseTarget(
|
|
|
+ final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
|
|
|
src, replication, clientNode, excludedNodes, blockSize);
|
|
|
|
|
|
- // Allocate a new block and record it in the INode.
|
|
|
+ // Part II.
|
|
|
+ // Allocate a new block, add it to the INode and the BlocksMap.
|
|
|
+ Block newBlock = null;
|
|
|
+ long offset;
|
|
|
writeLock();
|
|
|
try {
|
|
|
- checkOperation(OperationCategory.WRITE);
|
|
|
- if (isInSafeMode()) {
|
|
|
- throw new SafeModeException("Cannot add block to " + src, safeMode);
|
|
|
- }
|
|
|
+ // Run the full analysis again, since things could have changed
|
|
|
+ // while chooseTarget() was executing.
|
|
|
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
|
+ INodesInPath inodesInPath =
|
|
|
+ analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
|
|
|
+ INode[] inodes = inodesInPath.getINodes();
|
|
|
+ final INodeFileUnderConstruction pendingFile =
|
|
|
+ (INodeFileUnderConstruction) inodes[inodes.length - 1];
|
|
|
|
|
|
- final INodesInPath inodesInPath = dir.rootDir.getExistingPathINodes(src, true);
|
|
|
- final INode[] inodes = inodesInPath.getINodes();
|
|
|
- final INodeFileUnderConstruction pendingFile
|
|
|
- = checkLease(src, clientName, inodes[inodes.length - 1]);
|
|
|
-
|
|
|
- if (!checkFileProgress(pendingFile, false)) {
|
|
|
- throw new NotReplicatedYetException("Not replicated yet:" + src);
|
|
|
+ if(onRetryBlock[0] != null) {
|
|
|
+ // This is a retry. Just return the last block.
|
|
|
+ return onRetryBlock[0];
|
|
|
}
|
|
|
|
|
|
- // allocate new block record block locations in INode.
|
|
|
- newBlock = allocateBlock(src, inodesInPath, targets);
|
|
|
-
|
|
|
- for (DatanodeDescriptor dn : targets) {
|
|
|
- dn.incBlocksScheduled();
|
|
|
- }
|
|
|
+ // commit the last block and complete it if it has minimum replicas
|
|
|
+ commitOrCompleteLastBlock(pendingFile,
|
|
|
+ ExtendedBlock.getLocalBlock(previous));
|
|
|
+
|
|
|
+ // allocate new block, record block locations in INode.
|
|
|
+ newBlock = createNewBlock();
|
|
|
+ saveAllocatedBlock(src, inodesInPath, newBlock, targets);
|
|
|
+
|
|
|
dir.persistBlocks(src, pendingFile);
|
|
|
+ offset = pendingFile.computeFileSize(true);
|
|
|
} finally {
|
|
|
writeUnlock();
|
|
|
}
|
|
@@ -2333,10 +2298,115 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
getEditLog().logSync();
|
|
|
}
|
|
|
|
|
|
- // Create next block
|
|
|
- LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
|
|
|
- blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
|
|
|
- return b;
|
|
|
+ // Return located block
|
|
|
+ return makeLocatedBlock(newBlock, targets, offset);
|
|
|
+ }
|
|
|
+
|
|
|
+ INodesInPath analyzeFileState(String src,
|
|
|
+ long fileId,
|
|
|
+ String clientName,
|
|
|
+ ExtendedBlock previous,
|
|
|
+ LocatedBlock[] onRetryBlock)
|
|
|
+ throws IOException {
|
|
|
+ assert hasReadOrWriteLock();
|
|
|
+
|
|
|
+ checkBlock(previous);
|
|
|
+ onRetryBlock[0] = null;
|
|
|
+ checkOperation(OperationCategory.WRITE);
|
|
|
+ if (isInSafeMode()) {
|
|
|
+ throw new SafeModeException("Cannot add block to " + src, safeMode);
|
|
|
+ }
|
|
|
+
|
|
|
+ // have we exceeded the configured limit of fs objects.
|
|
|
+ checkFsObjectLimit();
|
|
|
+
|
|
|
+ Block previousBlock = ExtendedBlock.getLocalBlock(previous);
|
|
|
+ final INodesInPath inodesInPath =
|
|
|
+ dir.rootDir.getExistingPathINodes(src, true);
|
|
|
+ final INode[] inodes = inodesInPath.getINodes();
|
|
|
+ final INodeFileUnderConstruction pendingFile
|
|
|
+ = checkLease(src, fileId, clientName, inodes[inodes.length - 1]);
|
|
|
+ BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
|
|
+ if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
|
|
|
+ // The block that the client claims is the current last block
|
|
|
+ // doesn't match up with what we think is the last block. There are
|
|
|
+ // four possibilities:
|
|
|
+ // 1) This is the first block allocation of an append() pipeline
|
|
|
+ // which started appending exactly at a block boundary.
|
|
|
+ // In this case, the client isn't passed the previous block,
|
|
|
+ // so it makes the allocateBlock() call with previous=null.
|
|
|
+ // We can distinguish this since the last block of the file
|
|
|
+ // will be exactly a full block.
|
|
|
+ // 2) This is a retry from a client that missed the response of a
|
|
|
+ // prior getAdditionalBlock() call, perhaps because of a network
|
|
|
+ // timeout, or because of an HA failover. In that case, we know
|
|
|
+ // by the fact that the client is re-issuing the RPC that it
|
|
|
+ // never began to write to the old block. Hence it is safe to
|
|
|
+ // to return the existing block.
|
|
|
+ // 3) This is an entirely bogus request/bug -- we should error out
|
|
|
+ // rather than potentially appending a new block with an empty
|
|
|
+ // one in the middle, etc
|
|
|
+ // 4) This is a retry from a client that timed out while
|
|
|
+ // the prior getAdditionalBlock() is still being processed,
|
|
|
+ // currently working on chooseTarget().
|
|
|
+ // There are no means to distinguish between the first and
|
|
|
+ // the second attempts in Part I, because the first one hasn't
|
|
|
+ // changed the namesystem state yet.
|
|
|
+ // We run this analysis again in Part II where case 4 is impossible.
|
|
|
+
|
|
|
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
|
|
|
+ if (previous == null &&
|
|
|
+ lastBlockInFile != null &&
|
|
|
+ lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
|
|
|
+ lastBlockInFile.isComplete()) {
|
|
|
+ // Case 1
|
|
|
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
|
|
+ " writing to a file with a complete previous block: src=" +
|
|
|
+ src + " lastBlock=" + lastBlockInFile);
|
|
|
+ }
|
|
|
+ } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
|
|
+ if (lastBlockInFile.getNumBytes() != 0) {
|
|
|
+ throw new IOException(
|
|
|
+ "Request looked like a retry to allocate block " +
|
|
|
+ lastBlockInFile + " but it already contains " +
|
|
|
+ lastBlockInFile.getNumBytes() + " bytes");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Case 2
|
|
|
+ // Return the last block.
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
|
|
|
+ "caught retry for allocation of a new block in " +
|
|
|
+ src + ". Returning previously allocated block " + lastBlockInFile);
|
|
|
+ long offset = pendingFile.computeFileSize(true);
|
|
|
+ onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
|
|
|
+ ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
|
|
|
+ offset);
|
|
|
+ return inodesInPath;
|
|
|
+ } else {
|
|
|
+ // Case 3
|
|
|
+ throw new IOException("Cannot allocate block in " + src + ": " +
|
|
|
+ "passed 'previous' block " + previous + " does not match actual " +
|
|
|
+ "last block in file " + lastBlockInFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if the penultimate block is minimally replicated
|
|
|
+ if (!checkFileProgress(pendingFile, false)) {
|
|
|
+ throw new NotReplicatedYetException("Not replicated yet: " + src);
|
|
|
+ }
|
|
|
+ return inodesInPath;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocatedBlock makeLocatedBlock(Block blk,
|
|
|
+ DatanodeInfo[] locs,
|
|
|
+ long offset) throws IOException {
|
|
|
+ LocatedBlock lBlk = new LocatedBlock(
|
|
|
+ getExtendedBlock(blk), locs, offset);
|
|
|
+ getBlockManager().setBlockToken(
|
|
|
+ lBlk, BlockTokenSecretManager.AccessMode.WRITE);
|
|
|
+ return lBlk;
|
|
|
}
|
|
|
|
|
|
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
|
|
@@ -2424,14 +2494,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
}
|
|
|
|
|
|
// make sure that we still have the lease on this file.
|
|
|
- private INodeFileUnderConstruction checkLease(String src, String holder)
|
|
|
- throws LeaseExpiredException, UnresolvedLinkException {
|
|
|
+ private INodeFileUnderConstruction checkLease(String src, String holder)
|
|
|
+ throws LeaseExpiredException, UnresolvedLinkException,
|
|
|
+ FileNotFoundException {
|
|
|
assert hasReadOrWriteLock();
|
|
|
- return checkLease(src, holder, dir.getINode(src));
|
|
|
+ return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
|
|
|
+ dir.getINode(src));
|
|
|
}
|
|
|
-
|
|
|
- private INodeFileUnderConstruction checkLease(String src, String holder,
|
|
|
- INode file) throws LeaseExpiredException {
|
|
|
+
|
|
|
+ private INodeFileUnderConstruction checkLease(String src, long fileId,
|
|
|
+ String holder, INode file) throws LeaseExpiredException,
|
|
|
+ FileNotFoundException {
|
|
|
assert hasReadOrWriteLock();
|
|
|
if (file == null || !(file instanceof INodeFile)) {
|
|
|
Lease lease = leaseManager.getLease(holder);
|
|
@@ -2452,6 +2525,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
|
|
|
+ pendingFile.getClientName() + " but is accessed by " + holder);
|
|
|
}
|
|
|
+ INodeId.checkId(fileId, pendingFile);
|
|
|
return pendingFile;
|
|
|
}
|
|
|
|
|
@@ -2528,22 +2602,33 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Allocate a block at the given pending filename
|
|
|
+ * Save allocated block at the given pending filename
|
|
|
*
|
|
|
* @param src path to the file
|
|
|
* @param inodesInPath representing each of the components of src.
|
|
|
* The last INode is the INode for the file.
|
|
|
* @throws QuotaExceededException If addition of block exceeds space quota
|
|
|
*/
|
|
|
- private Block allocateBlock(String src, INodesInPath inodesInPath,
|
|
|
- DatanodeDescriptor targets[]) throws IOException {
|
|
|
+ BlockInfo saveAllocatedBlock(String src, INodesInPath inodesInPath,
|
|
|
+ Block newBlock, DatanodeDescriptor targets[]) throws IOException {
|
|
|
+ assert hasWriteLock();
|
|
|
+ BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets);
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
|
|
|
+ + getBlockPoolId() + " " + b);
|
|
|
+ for (DatanodeDescriptor dn : targets) {
|
|
|
+ dn.incBlocksScheduled();
|
|
|
+ }
|
|
|
+ return b;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create new block with a unique block id and a new generation stamp.
|
|
|
+ */
|
|
|
+ Block createNewBlock() throws IOException {
|
|
|
assert hasWriteLock();
|
|
|
Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0);
|
|
|
// Increment the generation stamp for every new block.
|
|
|
b.setGenerationStamp(nextGenerationStamp());
|
|
|
- b = dir.addBlock(src, inodesInPath, b, targets);
|
|
|
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
|
|
|
- + blockPoolId + " " + b);
|
|
|
return b;
|
|
|
}
|
|
|
|
|
@@ -5582,7 +5667,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|
|
@Override
|
|
|
public boolean isAvoidingStaleDataNodesForWrite() {
|
|
|
return this.blockManager.getDatanodeManager()
|
|
|
- .isAvoidingStaleDataNodesForWrite();
|
|
|
+ .shouldAvoidStaleDataNodesForWrite();
|
|
|
}
|
|
|
|
|
|
/**
|