|
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.protocol.*;
|
|
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
|
@@ -563,13 +564,18 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
}
|
|
|
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
|
|
|
long totalSize = 0;
|
|
|
+ BlockInfo curBlock;
|
|
|
while(totalSize<size && iter.hasNext()) {
|
|
|
- totalSize += addBlock(iter.next(), results);
|
|
|
+ curBlock = iter.next();
|
|
|
+ if(!curBlock.isComplete()) continue;
|
|
|
+ totalSize += addBlock(curBlock, results);
|
|
|
}
|
|
|
if(totalSize<size) {
|
|
|
iter = node.getBlockIterator(); // start from the beginning
|
|
|
for(int i=0; i<startBlock&&totalSize<size; i++) {
|
|
|
- totalSize += addBlock(iter.next(), results);
|
|
|
+ curBlock = iter.next();
|
|
|
+ if(!curBlock.isComplete()) continue;
|
|
|
+ totalSize += addBlock(curBlock, results);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -707,14 +713,46 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
if (doAccessTime && isAccessTimeSupported()) {
|
|
|
dir.setTimes(src, inode, -1, now(), false);
|
|
|
}
|
|
|
- final Block[] blocks = inode.getBlocks();
|
|
|
+ final BlockInfo[] blocks = inode.getBlocks();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
|
|
|
+ }
|
|
|
if (blocks == null) {
|
|
|
return null;
|
|
|
}
|
|
|
- final List<LocatedBlock> results = blocks.length == 0?
|
|
|
- new ArrayList<LocatedBlock>(0):
|
|
|
- blockManager.getBlockLocations(blocks, offset, length, Integer.MAX_VALUE);
|
|
|
- return inode.createLocatedBlocks(results);
|
|
|
+
|
|
|
+ if (blocks.length == 0) {
|
|
|
+ return new LocatedBlocks(0, inode.isUnderConstruction(),
|
|
|
+ Collections.<LocatedBlock>emptyList(), null, false);
|
|
|
+ } else {
|
|
|
+ final long n = inode.computeFileSize(false);
|
|
|
+ final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
|
|
|
+ blocks, offset, length, Integer.MAX_VALUE);
|
|
|
+ final BlockInfo last = inode.getLastBlock();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("last = " + last);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!last.isComplete()) {
|
|
|
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
|
|
|
+ blockManager.getBlockLocation(last, n), false);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
|
|
|
+ blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Create a LocatedBlock. */
|
|
|
+ LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
|
|
|
+ final long offset, final boolean corrupt) throws IOException {
|
|
|
+ final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
|
|
|
+ if (isAccessTokenEnabled) {
|
|
|
+ lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
|
|
|
+ EnumSet.of(AccessTokenHandler.AccessMode.READ)));
|
|
|
+ }
|
|
|
+ return lb;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -912,40 +950,45 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
// If the file is under construction , then it must be in our
|
|
|
// leases. Find the appropriate lease record.
|
|
|
//
|
|
|
- Lease lease = leaseManager.getLease(holder);
|
|
|
- //
|
|
|
- // We found the lease for this file. And surprisingly the original
|
|
|
- // holder is trying to recreate this file. This should never occur.
|
|
|
- //
|
|
|
- if (lease != null) {
|
|
|
+ Lease lease = leaseManager.getLeaseByPath(src);
|
|
|
+ if (lease == null) {
|
|
|
throw new AlreadyBeingCreatedException(
|
|
|
- "failed to create file " + src + " for " + holder +
|
|
|
- " on client " + clientMachine +
|
|
|
- " because current leaseholder is trying to recreate file.");
|
|
|
+ "failed to create file " + src + " for " + holder +
|
|
|
+ " on client " + clientMachine +
|
|
|
+ " because pendingCreates is non-null but no leases found.");
|
|
|
}
|
|
|
//
|
|
|
- // Find the original holder.
|
|
|
+ // We found the lease for this file. And surprisingly the original
|
|
|
+ // holder is trying to recreate this file. This should never occur.
|
|
|
//
|
|
|
- lease = leaseManager.getLease(pendingFile.clientName);
|
|
|
- if (lease == null) {
|
|
|
+ if (lease.getHolder().equals(holder)) {
|
|
|
throw new AlreadyBeingCreatedException(
|
|
|
- "failed to create file " + src + " for " + holder +
|
|
|
- " on client " + clientMachine +
|
|
|
- " because pendingCreates is non-null but no leases found.");
|
|
|
+ "failed to create file " + src + " for " + holder +
|
|
|
+ " on client " + clientMachine +
|
|
|
+ " because current leaseholder is trying to recreate file.");
|
|
|
}
|
|
|
+ assert lease.getHolder().equals(pendingFile.getClientName()) :
|
|
|
+ "Current lease holder " + lease.getHolder() +
|
|
|
+ " does not match file creator " + pendingFile.getClientName();
|
|
|
//
|
|
|
+ // Current lease holder is different from the requester.
|
|
|
// If the original holder has not renewed in the last SOFTLIMIT
|
|
|
- // period, then start lease recovery.
|
|
|
+ // period, then start lease recovery, otherwise fail.
|
|
|
//
|
|
|
if (lease.expiredSoftLimit()) {
|
|
|
LOG.info("startFile: recover lease " + lease + ", src=" + src);
|
|
|
- internalReleaseLease(lease, src);
|
|
|
- }
|
|
|
- throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
|
|
|
- " on client " + clientMachine +
|
|
|
- ", because this file is already being created by " +
|
|
|
- pendingFile.getClientName() +
|
|
|
- " on " + pendingFile.getClientMachine());
|
|
|
+ boolean isClosed = internalReleaseLease(lease, src, null);
|
|
|
+ if(!isClosed)
|
|
|
+ throw new RecoveryInProgressException(
|
|
|
+ "Failed to close file " + src +
|
|
|
+ ". Lease recovery is in progress. Try again later.");
|
|
|
+
|
|
|
+ } else
|
|
|
+ throw new AlreadyBeingCreatedException("failed to create file " +
|
|
|
+ src + " for " + holder + " on client " + clientMachine +
|
|
|
+ ", because this file is already being created by " +
|
|
|
+ pendingFile.getClientName() +
|
|
|
+ " on " + pendingFile.getClientMachine());
|
|
|
}
|
|
|
|
|
|
try {
|
|
@@ -998,7 +1041,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
clientMachine,
|
|
|
clientNode);
|
|
|
dir.replaceNode(src, node, cons);
|
|
|
- leaseManager.addLease(cons.clientName, src);
|
|
|
+ leaseManager.addLease(cons.getClientName(), src);
|
|
|
|
|
|
} else {
|
|
|
// Now we can add the name to the filesystem. This file has no
|
|
@@ -1014,7 +1057,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
throw new IOException("DIR* NameSystem.startFile: " +
|
|
|
"Unable to add file to namespace.");
|
|
|
}
|
|
|
- leaseManager.addLease(newNode.clientName, src);
|
|
|
+ leaseManager.addLease(newNode.getClientName(), src);
|
|
|
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
|
|
|
+"add "+src+" to namespace for "+holder);
|
|
@@ -1048,40 +1091,36 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
LocatedBlock lb = null;
|
|
|
synchronized (this) {
|
|
|
INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
|
|
|
-
|
|
|
- BlockInfo[] blocks = file.getBlocks();
|
|
|
- if (blocks != null && blocks.length > 0) {
|
|
|
- BlockInfo last = blocks[blocks.length-1];
|
|
|
- // this is a redundant search in blocksMap
|
|
|
- // should be resolved by the new implementation of append
|
|
|
- BlockInfo storedBlock = blockManager.getStoredBlock(last);
|
|
|
- assert last == storedBlock : "last block should be in the blocksMap";
|
|
|
- if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
|
|
|
+ BlockInfo lastBlock = file.getLastBlock();
|
|
|
+ if (lastBlock != null) {
|
|
|
+ assert lastBlock == blockManager.getStoredBlock(lastBlock) :
|
|
|
+ "last block of the file is not in blocksMap";
|
|
|
+ if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
|
|
|
long fileLength = file.computeContentSummary().getLength();
|
|
|
- DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
|
|
|
+ DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
|
|
|
// remove the replica locations of this block from the node
|
|
|
for (int i = 0; i < targets.length; i++) {
|
|
|
- targets[i].removeBlock(storedBlock);
|
|
|
+ targets[i].removeBlock(lastBlock);
|
|
|
}
|
|
|
- // set the locations of the last block in the lease record
|
|
|
- file.setLastBlock(storedBlock, targets);
|
|
|
+ // convert last block to under-construction and set its locations
|
|
|
+ blockManager.convertLastBlockToUnderConstruction(file, targets);
|
|
|
|
|
|
- lb = new LocatedBlock(last, targets,
|
|
|
- fileLength-storedBlock.getNumBytes());
|
|
|
+ lb = new LocatedBlock(lastBlock, targets,
|
|
|
+ fileLength-lastBlock.getNumBytes());
|
|
|
if (isAccessTokenEnabled) {
|
|
|
lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
|
|
|
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
|
|
|
}
|
|
|
|
|
|
// Remove block from replication queue.
|
|
|
- blockManager.updateNeededReplications(last, 0, 0);
|
|
|
+ blockManager.updateNeededReplications(lastBlock, 0, 0);
|
|
|
|
|
|
// remove this block from the list of pending blocks to be deleted.
|
|
|
// This reduces the possibility of triggering HADOOP-1349.
|
|
|
//
|
|
|
for (DatanodeDescriptor dd : targets) {
|
|
|
String datanodeId = dd.getStorageID();
|
|
|
- blockManager.removeFromInvalidates(datanodeId, last);
|
|
|
+ blockManager.removeFromInvalidates(datanodeId, lastBlock);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1115,7 +1154,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
* client to "try again later".
|
|
|
*/
|
|
|
public LocatedBlock getAdditionalBlock(String src,
|
|
|
- String clientName
|
|
|
+ String clientName,
|
|
|
+ Block previous
|
|
|
) throws IOException {
|
|
|
long fileLength, blockSize;
|
|
|
int replication;
|
|
@@ -1135,6 +1175,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
|
|
|
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
|
|
|
|
|
+ // commit the last block
|
|
|
+ blockManager.commitLastBlock(pendingFile, previous);
|
|
|
+
|
|
|
//
|
|
|
// If we fail this, bad things happen!
|
|
|
//
|
|
@@ -1168,9 +1211,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
throw new NotReplicatedYetException("Not replicated yet:" + src);
|
|
|
}
|
|
|
|
|
|
+ // complete the penultimate block
|
|
|
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
|
|
|
+
|
|
|
// allocate new block record block locations in INode.
|
|
|
- newBlock = allocateBlock(src, pathINodes);
|
|
|
- pendingFile.setTargets(targets);
|
|
|
+ newBlock = allocateBlock(src, pathINodes, targets);
|
|
|
|
|
|
for (DatanodeDescriptor dn : targets) {
|
|
|
dn.incBlocksScheduled();
|
|
@@ -1250,15 +1295,18 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
COMPLETE_SUCCESS
|
|
|
}
|
|
|
|
|
|
- public CompleteFileStatus completeFile(String src, String holder) throws IOException {
|
|
|
- CompleteFileStatus status = completeFileInternal(src, holder);
|
|
|
+ public CompleteFileStatus completeFile(String src,
|
|
|
+ String holder,
|
|
|
+ Block last) throws IOException {
|
|
|
+ CompleteFileStatus status = completeFileInternal(src, holder, last);
|
|
|
getEditLog().logSync();
|
|
|
return status;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- private synchronized CompleteFileStatus completeFileInternal(String src,
|
|
|
- String holder) throws IOException {
|
|
|
+ private synchronized CompleteFileStatus completeFileInternal(
|
|
|
+ String src,
|
|
|
+ String holder,
|
|
|
+ Block last) throws IOException {
|
|
|
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
|
|
|
if (isInSafeMode())
|
|
|
throw new SafeModeException("Cannot complete file " + src, safeMode);
|
|
@@ -1279,7 +1327,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
("from " + pendingFile.getClientMachine()))
|
|
|
);
|
|
|
return CompleteFileStatus.OPERATION_FAILED;
|
|
|
- } else if (!checkFileProgress(pendingFile, true)) {
|
|
|
+ }
|
|
|
+
|
|
|
+ // commit the last block
|
|
|
+ blockManager.commitLastBlock(pendingFile, last);
|
|
|
+
|
|
|
+ if (!checkFileProgress(pendingFile, true)) {
|
|
|
return CompleteFileStatus.STILL_WAITING;
|
|
|
}
|
|
|
|
|
@@ -1312,13 +1365,15 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
* @param inodes INode representing each of the components of src.
|
|
|
* <code>inodes[inodes.length-1]</code> is the INode for the file.
|
|
|
*/
|
|
|
- private Block allocateBlock(String src, INode[] inodes) throws IOException {
|
|
|
+ private Block allocateBlock(String src,
|
|
|
+ INode[] inodes,
|
|
|
+ DatanodeDescriptor targets[]) throws IOException {
|
|
|
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
|
|
|
while(isValidBlock(b)) {
|
|
|
b.setBlockId(FSNamesystem.randBlockId.nextLong());
|
|
|
}
|
|
|
b.setGenerationStamp(getGenerationStamp());
|
|
|
- b = dir.addBlock(src, inodes, b);
|
|
|
+ b = dir.addBlock(src, inodes, b, targets);
|
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
|
|
|
+src+ ". "+b);
|
|
|
return b;
|
|
@@ -1329,12 +1384,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
* replicated. If not, return false. If checkall is true, then check
|
|
|
* all blocks, otherwise check only penultimate block.
|
|
|
*/
|
|
|
- synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
|
|
|
+ synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException {
|
|
|
if (checkall) {
|
|
|
//
|
|
|
// check all blocks of the file.
|
|
|
//
|
|
|
- for (Block block: v.getBlocks()) {
|
|
|
+ for (BlockInfo block: v.getBlocks()) {
|
|
|
if (!blockManager.checkMinReplication(block)) {
|
|
|
return false;
|
|
|
}
|
|
@@ -1343,7 +1398,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
//
|
|
|
// check the penultimate block of this file
|
|
|
//
|
|
|
- Block b = v.getPenultimateBlock();
|
|
|
+ BlockInfo b = v.getPenultimateBlock();
|
|
|
if (b != null && !blockManager.checkMinReplication(b)) {
|
|
|
return false;
|
|
|
}
|
|
@@ -1614,20 +1669,31 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
* Move a file that is being written to be immutable.
|
|
|
* @param src The filename
|
|
|
* @param lease The lease for the client creating the file
|
|
|
+ * @param recoveryLeaseHolder reassign lease to this holder if the last block
|
|
|
+ * needs recovery; keep current holder if null.
|
|
|
+ * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
|
|
|
+ * replication;<br>
|
|
|
+ * RecoveryInProgressException if lease recovery is in progress.<br>
|
|
|
+ * IOException in case of an error.
|
|
|
+ * @return true if file has been successfully finalized and closed or
|
|
|
+ * false if block recovery has been initiated
|
|
|
*/
|
|
|
- void internalReleaseLease(Lease lease, String src) throws IOException {
|
|
|
+ boolean internalReleaseLease(
|
|
|
+ Lease lease, String src, String recoveryLeaseHolder)
|
|
|
+ throws AlreadyBeingCreatedException,
|
|
|
+ IOException {
|
|
|
LOG.info("Recovering lease=" + lease + ", src=" + src);
|
|
|
|
|
|
INodeFile iFile = dir.getFileINode(src);
|
|
|
if (iFile == null) {
|
|
|
- final String message = "DIR* NameSystem.internalReleaseCreate: "
|
|
|
+ final String message = "DIR* NameSystem.internalReleaseLease: "
|
|
|
+ "attempt to release a create lock on "
|
|
|
+ src + " file does not exist.";
|
|
|
NameNode.stateChangeLog.warn(message);
|
|
|
throw new IOException(message);
|
|
|
}
|
|
|
if (!iFile.isUnderConstruction()) {
|
|
|
- final String message = "DIR* NameSystem.internalReleaseCreate: "
|
|
|
+ final String message = "DIR* NameSystem.internalReleaseLease: "
|
|
|
+ "attempt to release a create lock on "
|
|
|
+ src + " but file is already closed.";
|
|
|
NameNode.stateChangeLog.warn(message);
|
|
@@ -1635,39 +1701,123 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
}
|
|
|
|
|
|
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
|
|
|
+ int nrBlocks = pendingFile.numBlocks();
|
|
|
+ BlockInfo[] blocks = pendingFile.getBlocks();
|
|
|
+
|
|
|
+ int nrCompleteBlocks;
|
|
|
+ BlockInfo curBlock = null;
|
|
|
+ for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
|
|
|
+ curBlock = blocks[nrCompleteBlocks];
|
|
|
+ if(!curBlock.isComplete())
|
|
|
+ break;
|
|
|
+ assert blockManager.checkMinReplication(curBlock) :
|
|
|
+ "A COMPLETE block is not minimally replicated in " + src;
|
|
|
+ }
|
|
|
+
|
|
|
+ // If there are no incomplete blocks associated with this file,
|
|
|
+ // then reap lease immediately and close the file.
|
|
|
+ if(nrCompleteBlocks == nrBlocks) {
|
|
|
+ finalizeINodeFileUnderConstruction(src, pendingFile);
|
|
|
+ NameNode.stateChangeLog.warn("BLOCK*"
|
|
|
+ + " internalReleaseLease: All existing blocks are COMPLETE,"
|
|
|
+ + " lease removed, file closed.");
|
|
|
+ return true; // closed!
|
|
|
+ }
|
|
|
+
|
|
|
+ // Only the last and the penultimate blocks may be in non COMPLETE state.
|
|
|
+ // If the penultimate block is not COMPLETE, then it must be COMMITTED.
|
|
|
+ if(nrCompleteBlocks < nrBlocks - 2 ||
|
|
|
+ nrCompleteBlocks == nrBlocks - 2 &&
|
|
|
+ curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
|
|
|
+ final String message = "DIR* NameSystem.internalReleaseLease: "
|
|
|
+ + "attempt to release a create lock on "
|
|
|
+ + src + " but file is already closed.";
|
|
|
+ NameNode.stateChangeLog.warn(message);
|
|
|
+ throw new IOException(message);
|
|
|
+ }
|
|
|
|
|
|
- // Initialize lease recovery for pendingFile. If there are no blocks
|
|
|
- // associated with this file, then reap lease immediately. Otherwise
|
|
|
- // renew the lease and trigger lease recovery.
|
|
|
- if (pendingFile.getTargets() == null ||
|
|
|
- pendingFile.getTargets().length == 0) {
|
|
|
- if (pendingFile.getBlocks().length == 0) {
|
|
|
+ // no we know that the last block is not COMPLETE, and
|
|
|
+ // that the penultimate block if exists is either COMPLETE or COMMITTED
|
|
|
+ BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
|
|
|
+ BlockUCState lastBlockState = lastBlock.getBlockUCState();
|
|
|
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
|
|
|
+ BlockUCState penultimateBlockState = (penultimateBlock == null ?
|
|
|
+ BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
|
|
|
+ assert penultimateBlockState == BlockUCState.COMPLETE ||
|
|
|
+ penultimateBlockState == BlockUCState.COMMITTED :
|
|
|
+ "Unexpected state of penultimate block in " + src;
|
|
|
+
|
|
|
+ switch(lastBlockState) {
|
|
|
+ case COMPLETE:
|
|
|
+ assert false : "Already checked that the last block is incomplete";
|
|
|
+ break;
|
|
|
+ case COMMITTED:
|
|
|
+ // Close file if committed blocks are minimally replicated
|
|
|
+ if(blockManager.checkMinReplication(penultimateBlock) &&
|
|
|
+ blockManager.checkMinReplication(lastBlock)) {
|
|
|
finalizeINodeFileUnderConstruction(src, pendingFile);
|
|
|
NameNode.stateChangeLog.warn("BLOCK*"
|
|
|
- + " internalReleaseLease: No blocks found, lease removed.");
|
|
|
- return;
|
|
|
+ + " internalReleaseLease: Committed blocks are minimally replicated,"
|
|
|
+ + " lease removed, file closed.");
|
|
|
+ return true; // closed!
|
|
|
}
|
|
|
- // setup the Inode.targets for the last block from the blockManager
|
|
|
- //
|
|
|
- BlockInfo[] blocks = pendingFile.getBlocks();
|
|
|
- BlockInfo last = blocks[blocks.length-1];
|
|
|
- DatanodeDescriptor[] targets = blockManager.getNodes(last);
|
|
|
- pendingFile.setTargets(targets);
|
|
|
+ // Cannot close file right now, since some blocks
|
|
|
+ // are not yet minimally replicated.
|
|
|
+ // This may potentially cause infinite loop in lease recovery
|
|
|
+ // if there are no valid replicas on data-nodes.
|
|
|
+ String message = "DIR* NameSystem.internalReleaseLease: " +
|
|
|
+ "Failed to release lease for file " + src +
|
|
|
+ ". Committed blocks are waiting to be minimally replicated." +
|
|
|
+ " Try again later.";
|
|
|
+ NameNode.stateChangeLog.warn(message);
|
|
|
+ throw new AlreadyBeingCreatedException(message);
|
|
|
+ case UNDER_CONSTRUCTION:
|
|
|
+ case UNDER_RECOVERY:
|
|
|
+ // setup the last block locations from the blockManager if not known
|
|
|
+ if(lastBlock.getNumExpectedLocations() == 0)
|
|
|
+ lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
|
|
|
+ // start recovery of the last block for this file
|
|
|
+ long blockRecoveryId = nextGenerationStamp();
|
|
|
+ lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
|
|
+ lastBlock.initializeBlockRecovery(blockRecoveryId);
|
|
|
+ leaseManager.renewLease(lease);
|
|
|
+ // Cannot close file right now, since the last block requires recovery.
|
|
|
+ // This may potentially cause infinite loop in lease recovery
|
|
|
+ // if there are no valid replicas on data-nodes.
|
|
|
+ NameNode.stateChangeLog.warn(
|
|
|
+ "DIR* NameSystem.internalReleaseLease: " +
|
|
|
+ "File " + src + " has not been closed." +
|
|
|
+ " Lease recovery is in progress. " +
|
|
|
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
|
|
|
+ break;
|
|
|
}
|
|
|
- // start lease recovery of the last block for this file.
|
|
|
- pendingFile.assignPrimaryDatanode();
|
|
|
- leaseManager.renewLease(lease);
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
- private void finalizeINodeFileUnderConstruction(String src,
|
|
|
+ Lease reassignLease(Lease lease, String src, String newHolder,
|
|
|
+ INodeFileUnderConstruction pendingFile) {
|
|
|
+ if(newHolder == null)
|
|
|
+ return lease;
|
|
|
+ pendingFile.setClientName(newHolder);
|
|
|
+ return leaseManager.reassignLease(lease, src, newHolder);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void finalizeINodeFileUnderConstruction(
|
|
|
+ String src,
|
|
|
INodeFileUnderConstruction pendingFile) throws IOException {
|
|
|
- leaseManager.removeLease(pendingFile.clientName, src);
|
|
|
+ leaseManager.removeLease(pendingFile.getClientName(), src);
|
|
|
+
|
|
|
+ // complete the penultimate block
|
|
|
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
|
|
|
|
|
|
// The file is no longer pending.
|
|
|
- // Create permanent INode, update blockmap
|
|
|
+ // Create permanent INode, update blocks
|
|
|
INodeFile newFile = pendingFile.convertToInodeFile();
|
|
|
dir.replaceNode(src, pendingFile, newFile);
|
|
|
|
|
|
+ // complete last block of the file
|
|
|
+ blockManager.completeBlock(newFile, newFile.numBlocks()-1);
|
|
|
// close file and persist block allocations for this file
|
|
|
dir.closeFile(src, newFile);
|
|
|
|
|
@@ -1690,11 +1840,20 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
throw new IOException("Block (=" + lastblock + ") not found");
|
|
|
}
|
|
|
INodeFile iFile = oldblockinfo.getINode();
|
|
|
- if (!iFile.isUnderConstruction()) {
|
|
|
+ if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) {
|
|
|
throw new IOException("Unexpected block (=" + lastblock
|
|
|
+ ") since the file (=" + iFile.getLocalName()
|
|
|
+ ") is not under construction");
|
|
|
}
|
|
|
+
|
|
|
+ long recoveryId =
|
|
|
+ ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId();
|
|
|
+ if(recoveryId != newgenerationstamp) {
|
|
|
+ throw new IOException("The recovery id " + newgenerationstamp
|
|
|
+ + " does not match current recovery id "
|
|
|
+ + recoveryId + " for block " + lastblock);
|
|
|
+ }
|
|
|
+
|
|
|
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
|
|
|
|
|
|
|
|
@@ -1703,12 +1862,15 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
blockManager.removeBlockFromMap(oldblockinfo);
|
|
|
|
|
|
if (deleteblock) {
|
|
|
- pendingFile.removeBlock(lastblock);
|
|
|
+ pendingFile.removeLastBlock(lastblock);
|
|
|
}
|
|
|
else {
|
|
|
// update last block, construct newblockinfo and add it to the blocks map
|
|
|
lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
|
|
|
- final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
|
|
|
+ BlockInfoUnderConstruction newblockinfo =
|
|
|
+ new BlockInfoUnderConstruction(
|
|
|
+ lastblock, pendingFile.getReplication());
|
|
|
+ blockManager.addINode(newblockinfo, pendingFile);
|
|
|
|
|
|
// find the DatanodeDescriptor objects
|
|
|
// There should be no locations in the blockManager till now because the
|
|
@@ -1727,11 +1889,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
for (int i = 0; i < descriptors.length; i++) {
|
|
|
descriptors[i].addBlock(newblockinfo);
|
|
|
}
|
|
|
- pendingFile.setLastBlock(newblockinfo, null);
|
|
|
- } else {
|
|
|
- // add locations into the INodeUnderConstruction
|
|
|
- pendingFile.setLastBlock(newblockinfo, descriptors);
|
|
|
}
|
|
|
+ // add locations into the INodeUnderConstruction
|
|
|
+ pendingFile.setLastBlock(newblockinfo, descriptors);
|
|
|
}
|
|
|
|
|
|
// If this commit does not want to close the file, persist
|
|
@@ -1745,7 +1905,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ // commit the last block
|
|
|
+ blockManager.commitLastBlock(pendingFile, lastblock);
|
|
|
+
|
|
|
//remove lease, close file
|
|
|
finalizeINodeFileUnderConstruction(src, pendingFile);
|
|
|
getEditLog().logSync();
|
|
@@ -3342,7 +3505,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
void setBlockTotal() {
|
|
|
if (safeMode == null)
|
|
|
return;
|
|
|
- safeMode.setBlockTotal((int)getBlocksTotal());
|
|
|
+ safeMode.setBlockTotal((int)getCompleteBlocksTotal());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3352,6 +3515,33 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
return blockManager.getTotalBlocks();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the total number of COMPLETE blocks in the system.
|
|
|
+ * For safe mode only complete blocks are counted.
|
|
|
+ */
|
|
|
+ long getCompleteBlocksTotal() {
|
|
|
+ // Calculate number of blocks under construction
|
|
|
+ long numUCBlocks = 0;
|
|
|
+ for (Lease lease : leaseManager.getSortedLeases()) {
|
|
|
+ for(String path : lease.getPaths()) {
|
|
|
+ INode node = dir.getFileINode(path);
|
|
|
+ assert node != null : "Found a lease for nonexisting file.";
|
|
|
+ assert node.isUnderConstruction() :
|
|
|
+ "Found a lease for file that is not under construction.";
|
|
|
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
|
|
|
+ BlockInfo[] blocks = cons.getBlocks();
|
|
|
+ if(blocks == null)
|
|
|
+ continue;
|
|
|
+ for(BlockInfo b : blocks) {
|
|
|
+ if(!b.isComplete())
|
|
|
+ numUCBlocks++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Number of blocks under construction: " + numUCBlocks);
|
|
|
+ return getBlocksTotal() - numUCBlocks;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Enter safe mode manually.
|
|
|
* @throws IOException
|
|
@@ -3671,29 +3861,129 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
|
|
|
return gs;
|
|
|
}
|
|
|
|
|
|
+ private INodeFileUnderConstruction checkUCBlock(Block block, String clientName)
|
|
|
+ throws IOException {
|
|
|
+ // check safe mode
|
|
|
+ if (isInSafeMode())
|
|
|
+ throw new SafeModeException("Cannot get a new generation stamp and an " +
|
|
|
+ "access token for block " + block, safeMode);
|
|
|
+
|
|
|
+ // check stored block state
|
|
|
+ BlockInfo storedBlock = blockManager.getStoredBlock(block);
|
|
|
+ if (storedBlock == null ||
|
|
|
+ storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
|
|
|
+ throw new IOException(block +
|
|
|
+ " does not exist or is not under Construction" + storedBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ // check file inode
|
|
|
+ INodeFile file = storedBlock.getINode();
|
|
|
+ if (file==null || !file.isUnderConstruction()) {
|
|
|
+ throw new IOException("The file " + storedBlock +
|
|
|
+ " is belonged to does not exist or it is not under construction.");
|
|
|
+ }
|
|
|
+
|
|
|
+ // check lease
|
|
|
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
|
|
|
+ if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
|
|
|
+ throw new LeaseExpiredException("Lease mismatch: " + block +
|
|
|
+ " is accessed by a non lease holder " + clientName);
|
|
|
+ }
|
|
|
+
|
|
|
+ return pendingFile;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Verifies that the block is associated with a file that has a lease.
|
|
|
- * Increments, logs and then returns the stamp
|
|
|
+ * Get a new generation stamp together with an access token for
|
|
|
+ * a block under construction
|
|
|
+ *
|
|
|
+ * This method is called for recovering a failed pipeline or setting up
|
|
|
+ * a pipeline to append to a block.
|
|
|
+ *
|
|
|
+ * @param block a block
|
|
|
+ * @param clientName the name of a client
|
|
|
+ * @return a located block with a new generation stamp and an access token
|
|
|
+ * @throws IOException if any error occurs
|
|
|
*/
|
|
|
- synchronized long nextGenerationStampForBlock(Block block) throws IOException {
|
|
|
- BlockInfo storedBlock = blockManager.getStoredBlock(block);
|
|
|
- if (storedBlock == null) {
|
|
|
- String msg = block + " is already commited, storedBlock == null.";
|
|
|
- LOG.info(msg);
|
|
|
- throw new IOException(msg);
|
|
|
+ synchronized LocatedBlock updateBlockForPipeline(Block block,
|
|
|
+ String clientName) throws IOException {
|
|
|
+ // check vadility of parameters
|
|
|
+ checkUCBlock(block, clientName);
|
|
|
+
|
|
|
+ // get a new generation stamp and an access token
|
|
|
+ block.setGenerationStamp(nextGenerationStamp());
|
|
|
+ LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
|
|
|
+ if (isAccessTokenEnabled) {
|
|
|
+ locatedBlock.setAccessToken(accessTokenHandler.generateToken(
|
|
|
+ block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
|
|
|
}
|
|
|
- INodeFile fileINode = storedBlock.getINode();
|
|
|
- if (!fileINode.isUnderConstruction()) {
|
|
|
- String msg = block + " is already commited, !fileINode.isUnderConstruction().";
|
|
|
- LOG.info(msg);
|
|
|
+ return locatedBlock;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Update a pipeline for a block under construction
|
|
|
+ *
|
|
|
+ * @param clientName the name of the client
|
|
|
+ * @param oldblock and old block
|
|
|
+ * @param newBlock a new block with a new generation stamp and length
|
|
|
+ * @param newNodes datanodes in the pipeline
|
|
|
+ * @throws IOException if any error occurs
|
|
|
+ */
|
|
|
+ synchronized void updatePipeline(String clientName, Block oldBlock,
|
|
|
+ Block newBlock, DatanodeID[] newNodes)
|
|
|
+ throws IOException {
|
|
|
+ LOG.info("updatePipeline(block=" + oldBlock
|
|
|
+ + ", newGenerationStamp=" + newBlock.getGenerationStamp()
|
|
|
+ + ", newLength=" + newBlock.getNumBytes()
|
|
|
+ + ", newNodes=" + Arrays.asList(newNodes)
|
|
|
+ + ", clientName=" + clientName
|
|
|
+ + ")");
|
|
|
+
|
|
|
+ // check the vadility of the block and lease holder name
|
|
|
+ final INodeFileUnderConstruction pendingFile =
|
|
|
+ checkUCBlock(oldBlock, clientName);
|
|
|
+ final BlockInfo oldblockinfo = pendingFile.getLastBlock();
|
|
|
+
|
|
|
+ // check new GS & length: this is not expected
|
|
|
+ if (newBlock.getGenerationStamp() <= oldblockinfo.getGenerationStamp() ||
|
|
|
+ newBlock.getNumBytes() < oldblockinfo.getNumBytes()) {
|
|
|
+ String msg = "Update " + oldBlock + " (len = " +
|
|
|
+ oldblockinfo.getNumBytes() + ") to an older state: " + newBlock +
|
|
|
+ " (len = " + newBlock.getNumBytes() +")";
|
|
|
+ LOG.warn(msg);
|
|
|
throw new IOException(msg);
|
|
|
}
|
|
|
- if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
|
|
|
- String msg = block + " is beening recovered, ignoring this request.";
|
|
|
- LOG.info(msg);
|
|
|
- throw new IOException(msg);
|
|
|
+
|
|
|
+ // Remove old block from blocks map. This always have to be done
|
|
|
+ // because the generation stamp of this block is changing.
|
|
|
+ blockManager.removeBlockFromMap(oldblockinfo);
|
|
|
+
|
|
|
+ // update last block, construct newblockinfo and add it to the blocks map
|
|
|
+ BlockInfoUnderConstruction newblockinfo =
|
|
|
+ new BlockInfoUnderConstruction(
|
|
|
+ newBlock, pendingFile.getReplication());
|
|
|
+ blockManager.addINode(newblockinfo, pendingFile);
|
|
|
+
|
|
|
+ // find the DatanodeDescriptor objects
|
|
|
+ DatanodeDescriptor[] descriptors = null;
|
|
|
+ if (newNodes.length > 0) {
|
|
|
+ descriptors = new DatanodeDescriptor[newNodes.length];
|
|
|
+ for(int i = 0; i < newNodes.length; i++) {
|
|
|
+ descriptors[i] = getDatanode(newNodes[i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // add locations into the INodeUnderConstruction
|
|
|
+ pendingFile.setLastBlock(newblockinfo, descriptors);
|
|
|
+
|
|
|
+ // persist blocks only if append is supported
|
|
|
+ String src = leaseManager.findPath(pendingFile);
|
|
|
+ if (supportAppends) {
|
|
|
+ dir.persistBlocks(src, pendingFile);
|
|
|
+ getEditLog().logSync();
|
|
|
}
|
|
|
- return nextGenerationStamp();
|
|
|
+ LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
// rename was successful. If any part of the renamed subtree had
|