|
@@ -1042,10 +1042,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
// holder is trying to recreate this file. This should never occur.
|
|
|
//
|
|
|
if (lease != null) {
|
|
|
- throw new AlreadyBeingCreatedException(
|
|
|
+ Lease leaseFile = leaseManager.getLeaseByPath(src);
|
|
|
+ if (leaseFile != null && leaseFile.equals(lease)) {
|
|
|
+ throw new AlreadyBeingCreatedException(
|
|
|
"failed to create file " + src + " for " + holder +
|
|
|
" on client " + clientMachine +
|
|
|
" because current leaseholder is trying to recreate file.");
|
|
|
+ }
|
|
|
}
|
|
|
//
|
|
|
// Find the original holder.
|
|
@@ -1818,12 +1821,31 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This is invoked when a lease expires. On lease expiry,
|
|
|
+ * all the files that were written from that dfsclient should be
|
|
|
+ * recovered.
|
|
|
+ */
|
|
|
+ void internalReleaseLease(Lease lease, String src) throws IOException {
|
|
|
+ if (lease.hasPath()) {
|
|
|
+ // make a copy of the paths because internalReleaseLeaseOne removes
|
|
|
+ // pathnames from the lease record.
|
|
|
+ String[] leasePaths = new String[lease.getPaths().size()];
|
|
|
+ lease.getPaths().toArray(leasePaths);
|
|
|
+ for (String p: leasePaths) {
|
|
|
+ internalReleaseLeaseOne(lease, p);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ internalReleaseLeaseOne(lease, src);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Move a file that is being written to be immutable.
|
|
|
* @param src The filename
|
|
|
* @param lease The lease for the client creating the file
|
|
|
*/
|
|
|
- void internalReleaseLease(Lease lease, String src) throws IOException {
|
|
|
+ void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
|
|
|
LOG.info("Recovering lease=" + lease + ", src=" + src);
|
|
|
|
|
|
INodeFile iFile = dir.getFileINode(src);
|
|
@@ -1931,20 +1953,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
descriptors = new DatanodeDescriptor[newtargets.length];
|
|
|
for(int i = 0; i < newtargets.length; i++) {
|
|
|
descriptors[i] = getDatanode(newtargets[i]);
|
|
|
- }
|
|
|
- }
|
|
|
- if (closeFile) {
|
|
|
- // the file is getting closed. Insert block locations into blocksMap.
|
|
|
- // Otherwise fsck will report these blocks as MISSING, especially if the
|
|
|
- // blocksReceived from Datanodes take a long time to arrive.
|
|
|
- for (int i = 0; i < descriptors.length; i++) {
|
|
|
descriptors[i].addBlock(newblockinfo);
|
|
|
}
|
|
|
- pendingFile.setLastBlock(newblockinfo, null);
|
|
|
- } else {
|
|
|
- // add locations into the INodeUnderConstruction
|
|
|
- pendingFile.setLastBlock(newblockinfo, descriptors);
|
|
|
}
|
|
|
+ // add locations into the INodeUnderConstruction
|
|
|
+ pendingFile.setLastBlock(newblockinfo, descriptors);
|
|
|
}
|
|
|
|
|
|
// If this commit does not want to close the file, persist
|
|
@@ -2329,9 +2342,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
|
|
|
break;
|
|
|
} catch (IOException ie) {
|
|
|
- LOG.warn("ReplicationMonitor thread received exception. " + ie);
|
|
|
+ LOG.warn("ReplicationMonitor thread received exception. " + ie + " " +
|
|
|
+ StringUtils.stringifyException(ie));
|
|
|
} catch (Throwable t) {
|
|
|
- LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
|
|
|
+ LOG.warn("ReplicationMonitor thread received Runtime exception. " + t + " " +
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
Runtime.getRuntime().exit(-1);
|
|
|
}
|
|
|
}
|
|
@@ -2933,6 +2948,24 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
DatanodeDescriptor node,
|
|
|
DatanodeDescriptor delNodeHint) {
|
|
|
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
|
|
|
+ if (storedBlock == null) {
|
|
|
+ // if the block with a WILDCARD generation stamp matches and the
|
|
|
+ // corresponding file is under construction, then accept this block.
|
|
|
+ // This block has a diferent generation stamp on the datanode
|
|
|
+ // because of a lease-recovery-attempt.
|
|
|
+ Block nblk = new Block(block.getBlockId());
|
|
|
+ storedBlock = blocksMap.getStoredBlock(nblk);
|
|
|
+ if (storedBlock != null && storedBlock.getINode() != null &&
|
|
|
+ (storedBlock.getGenerationStamp() <= block.getGenerationStamp() ||
|
|
|
+ storedBlock.getINode().isUnderConstruction())) {
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
|
|
|
+ + "addStoredBlock request received for " + block + " on "
|
|
|
+ + node.getName() + " size " + block.getNumBytes()
|
|
|
+ + " and it belongs to a file under construction. ");
|
|
|
+ } else {
|
|
|
+ storedBlock = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
if(storedBlock == null || storedBlock.getINode() == null) {
|
|
|
// If this block does not belong to anyfile, then we are done.
|
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
|
|
@@ -2953,6 +2986,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
if (block != storedBlock) {
|
|
|
if (block.getNumBytes() >= 0) {
|
|
|
long cursize = storedBlock.getNumBytes();
|
|
|
+ INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
|
|
|
+ boolean underConstruction = (file == null ? false : file.isUnderConstruction());
|
|
|
if (cursize == 0) {
|
|
|
storedBlock.setNumBytes(block.getNumBytes());
|
|
|
} else if (cursize != block.getNumBytes()) {
|
|
@@ -2964,9 +2999,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
if (cursize > block.getNumBytes()) {
|
|
|
// new replica is smaller in size than existing block.
|
|
|
// Mark the new replica as corrupt.
|
|
|
- LOG.warn("Mark new replica " + block + " from " + node.getName() +
|
|
|
- "as corrupt because its length is shorter than existing ones");
|
|
|
- markBlockAsCorrupt(block, node);
|
|
|
+ if (!underConstruction) {
|
|
|
+ LOG.warn("Mark new replica " + block + " from " + node.getName() +
|
|
|
+ "as corrupt because its length is shorter than existing ones");
|
|
|
+ markBlockAsCorrupt(block, node);
|
|
|
+ }
|
|
|
} else {
|
|
|
// new replica is larger in size than existing block.
|
|
|
// Mark pre-existing replicas as corrupt.
|
|
@@ -2980,7 +3017,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
nodes[count++] = dd;
|
|
|
}
|
|
|
}
|
|
|
- for (int j = 0; j < count; j++) {
|
|
|
+ for (int j = 0; j < count && !underConstruction; j++) {
|
|
|
LOG.warn("Mark existing replica " + block + " from " + node.getName() +
|
|
|
" as corrupt because its length is shorter than the new one");
|
|
|
markBlockAsCorrupt(block, nodes[j]);
|
|
@@ -3003,7 +3040,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
|
|
|
//Updated space consumed if required.
|
|
|
- INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
|
|
|
long diff = (file == null) ? 0 :
|
|
|
(file.getPreferredBlockSize() - storedBlock.getNumBytes());
|
|
|
|