|
@@ -38,6 +38,7 @@ import java.io.File;
|
|
import java.io.FileWriter;
|
|
import java.io.FileWriter;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.PrintWriter;
|
|
import java.io.PrintWriter;
|
|
|
|
+import java.io.DataOutputStream;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
@@ -221,6 +222,12 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
|
|
|
private long maxFsObjects = 0; // maximum number of fs objects
|
|
private long maxFsObjects = 0; // maximum number of fs objects
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * The global generation stamp for this file system.
|
|
|
|
+ * Valid values start from 1000.
|
|
|
|
+ */
|
|
|
|
+ private GenerationStamp generationStamp = new GenerationStamp(1000);
|
|
|
|
+
|
|
private long softLimit = LEASE_SOFTLIMIT_PERIOD;
|
|
private long softLimit = LEASE_SOFTLIMIT_PERIOD;
|
|
private long hardLimit = LEASE_HARDLIMIT_PERIOD;
|
|
private long hardLimit = LEASE_HARDLIMIT_PERIOD;
|
|
|
|
|
|
@@ -1029,27 +1036,19 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
DatanodeDescriptor clientNode =
|
|
DatanodeDescriptor clientNode =
|
|
host2DataNodeMap.getDatanodeByHost(clientMachine);
|
|
host2DataNodeMap.getDatanodeByHost(clientMachine);
|
|
|
|
|
|
- synchronized (sortedLeases) {
|
|
|
|
- Lease lease = getLease(holder);
|
|
|
|
- if (lease == null) {
|
|
|
|
- lease = new Lease(holder);
|
|
|
|
- putLease(holder, lease);
|
|
|
|
- sortedLeases.add(lease);
|
|
|
|
- } else {
|
|
|
|
- sortedLeases.remove(lease);
|
|
|
|
- lease.renew();
|
|
|
|
- sortedLeases.add(lease);
|
|
|
|
- }
|
|
|
|
- lease.startedCreate(src);
|
|
|
|
- }
|
|
|
|
|
|
+ addLease(src, holder);
|
|
|
|
|
|
//
|
|
//
|
|
// Now we can add the name to the filesystem. This file has no
|
|
// Now we can add the name to the filesystem. This file has no
|
|
// blocks associated with it.
|
|
// blocks associated with it.
|
|
//
|
|
//
|
|
checkFsObjectLimit();
|
|
checkFsObjectLimit();
|
|
|
|
+
|
|
|
|
+ // increment global generation stamp
|
|
|
|
+ long genstamp = generationStamp.nextStamp();
|
|
|
|
+
|
|
INode newNode = dir.addFile(src, permissions,
|
|
INode newNode = dir.addFile(src, permissions,
|
|
- replication, blockSize, holder, clientMachine, clientNode);
|
|
|
|
|
|
+ replication, blockSize, holder, clientMachine, clientNode, genstamp);
|
|
if (newNode == null) {
|
|
if (newNode == null) {
|
|
throw new IOException("DIR* NameSystem.startFile: " +
|
|
throw new IOException("DIR* NameSystem.startFile: " +
|
|
"Unable to add file to namespace.");
|
|
"Unable to add file to namespace.");
|
|
@@ -1106,28 +1105,30 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
blockSize = pendingFile.getPreferredBlockSize();
|
|
blockSize = pendingFile.getPreferredBlockSize();
|
|
clientNode = pendingFile.getClientNode();
|
|
clientNode = pendingFile.getClientNode();
|
|
replication = (int)pendingFile.getReplication();
|
|
replication = (int)pendingFile.getReplication();
|
|
- newBlock = allocateBlock(src, pendingFile);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // choose targets for the new block tobe allocated.
|
|
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
|
|
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
|
|
clientNode,
|
|
clientNode,
|
|
null,
|
|
null,
|
|
blockSize);
|
|
blockSize);
|
|
if (targets.length < this.minReplication) {
|
|
if (targets.length < this.minReplication) {
|
|
- // if we could not find any targets, remove this block from file
|
|
|
|
- synchronized (this) {
|
|
|
|
- INodeFile iFile = dir.getFileINode(src);
|
|
|
|
- if (iFile != null && iFile.isUnderConstruction()) {
|
|
|
|
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
|
|
|
|
- if (pendingFile.getClientName().equals(clientName)) {
|
|
|
|
- dir.removeBlock(src, pendingFile, newBlock);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
throw new IOException("File " + src + " could only be replicated to " +
|
|
throw new IOException("File " + src + " could only be replicated to " +
|
|
targets.length + " nodes, instead of " +
|
|
targets.length + " nodes, instead of " +
|
|
minReplication);
|
|
minReplication);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Allocate a new block and record it in the INode.
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
|
|
|
+ if (!checkFileProgress(pendingFile, false)) {
|
|
|
|
+ throw new NotReplicatedYetException("Not replicated yet:" + src);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // allocate new block record block locations in INode.
|
|
|
|
+ newBlock = allocateBlock(src, pendingFile);
|
|
|
|
+ pendingFile.setLastBlockLocations(targets);
|
|
|
|
+ }
|
|
|
|
|
|
// Create next block
|
|
// Create next block
|
|
return new LocatedBlock(newBlock, targets, fileLength);
|
|
return new LocatedBlock(newBlock, targets, fileLength);
|
|
@@ -1143,7 +1144,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
//
|
|
//
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
|
+b.getBlockName()+"of file "+src);
|
|
+b.getBlockName()+"of file "+src);
|
|
- INode file = checkLease(src, holder);
|
|
|
|
|
|
+ INodeFileUnderConstruction file = checkLease(src, holder);
|
|
dir.removeBlock(src, file, b);
|
|
dir.removeBlock(src, file, b);
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
|
|
+ b.getBlockName()
|
|
+ b.getBlockName()
|
|
@@ -1234,22 +1235,13 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
INodeFile newFile = pendingFile.convertToInodeFile();
|
|
INodeFile newFile = pendingFile.convertToInodeFile();
|
|
dir.replaceNode(src, pendingFile, newFile);
|
|
dir.replaceNode(src, pendingFile, newFile);
|
|
|
|
|
|
- // persist block allocations for this file
|
|
|
|
- dir.persistBlocks(src, newFile);
|
|
|
|
|
|
+ // close file and persist block allocations for this file
|
|
|
|
+ dir.closeFile(src, newFile);
|
|
|
|
|
|
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
|
|
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
|
|
+ " blocklist persisted");
|
|
+ " blocklist persisted");
|
|
|
|
|
|
- synchronized (sortedLeases) {
|
|
|
|
- Lease lease = getLease(holder);
|
|
|
|
- if (lease != null) {
|
|
|
|
- lease.completedCreate(src);
|
|
|
|
- if (!lease.hasLocks()) {
|
|
|
|
- removeLease(holder);
|
|
|
|
- sortedLeases.remove(lease);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ removeLease(src, holder);
|
|
|
|
|
|
//
|
|
//
|
|
// REMIND - mjc - this should be done only after we wait a few secs.
|
|
// REMIND - mjc - this should be done only after we wait a few secs.
|
|
@@ -1463,23 +1455,26 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
|
|
checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
|
|
}
|
|
}
|
|
|
|
|
|
- Block deletedBlocks[] = dir.delete(src);
|
|
|
|
- if (deletedBlocks != null) {
|
|
|
|
- for (int i = 0; i < deletedBlocks.length; i++) {
|
|
|
|
- Block b = deletedBlocks[i];
|
|
|
|
-
|
|
|
|
- for (Iterator<DatanodeDescriptor> it =
|
|
|
|
- blocksMap.nodeIterator(b); it.hasNext();) {
|
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
|
- addToInvalidates(b, node);
|
|
|
|
- NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
|
|
|
|
- + b.getBlockName() + " is added to invalidSet of "
|
|
|
|
- + node.getName());
|
|
|
|
- }
|
|
|
|
|
|
+ ArrayList<Block> deletedBlocks = new ArrayList<Block>();
|
|
|
|
+ INode old = dir.delete(src, deletedBlocks);
|
|
|
|
+ if (old == null) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ for (Block b : deletedBlocks) {
|
|
|
|
+ for (Iterator<DatanodeDescriptor> it =
|
|
|
|
+ blocksMap.nodeIterator(b); it.hasNext();) {
|
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
|
+ addToInvalidates(b, node);
|
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
|
|
|
|
+ + b.getBlockName() + " is added to invalidSet of "
|
|
|
|
+ + node.getName());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- return (deletedBlocks != null);
|
|
|
|
|
|
+ if (old.isUnderConstruction()) {
|
|
|
|
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
|
|
|
|
+ removeLease(src, cons.getClientName());
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1586,6 +1581,24 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
return dir.getContentLength(src);
|
|
return dir.getContentLength(src);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /** Persist all metadata about this file.
|
|
|
|
+ * @param src The string representation of the path
|
|
|
|
+ * @param clientName The string representation of the client
|
|
|
|
+ * @throws IOException if path does not exist
|
|
|
|
+ */
|
|
|
|
+ void fsync(String src, String clientName) throws IOException {
|
|
|
|
+
|
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
|
|
|
|
+ + src + " for " + clientName);
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ if (isInSafeMode()) {
|
|
|
|
+ throw new SafeModeException("Cannot fsync file " + src, safeMode);
|
|
|
|
+ }
|
|
|
|
+ INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
|
|
|
+ dir.persistBlocks(src, pendingFile);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/************************************************************
|
|
/************************************************************
|
|
* A Lease governs all the locks held by a single client.
|
|
* A Lease governs all the locks held by a single client.
|
|
* For each client there's a corresponding lease, whose
|
|
* For each client there's a corresponding lease, whose
|
|
@@ -1689,6 +1702,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
String getHolder() throws IOException {
|
|
String getHolder() throws IOException {
|
|
return holder.getString();
|
|
return holder.getString();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ Collection<StringBytesWritable> getPaths() throws IOException {
|
|
|
|
+ return creates;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/******************************************************
|
|
/******************************************************
|
|
@@ -1778,8 +1795,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
INodeFile newFile = pendingFile.convertToInodeFile();
|
|
INodeFile newFile = pendingFile.convertToInodeFile();
|
|
dir.replaceNode(src, pendingFile, newFile);
|
|
dir.replaceNode(src, pendingFile, newFile);
|
|
|
|
|
|
- // persist block allocations for this file
|
|
|
|
- dir.persistBlocks(src, newFile);
|
|
|
|
|
|
+ // close file and persist block allocations for this file
|
|
|
|
+ dir.closeFile(src, newFile);
|
|
|
|
|
|
NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " +
|
|
NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " +
|
|
src + " is no longer written to by " +
|
|
src + " is no longer written to by " +
|
|
@@ -4049,7 +4066,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
this.lmthread.interrupt();
|
|
this.lmthread.interrupt();
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
public long getFilesTotal() {
|
|
public long getFilesTotal() {
|
|
return this.dir.totalInodes();
|
|
return this.dir.totalInodes();
|
|
}
|
|
}
|
|
@@ -4122,4 +4138,91 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
}
|
|
}
|
|
return numDead;
|
|
return numDead;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Sets the generation stamp for this filesystem
|
|
|
|
+ */
|
|
|
|
+ void setGenerationStamp(long stamp) {
|
|
|
|
+ generationStamp.setStamp(stamp);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Gets the generation stamp for this filesystem
|
|
|
|
+ */
|
|
|
|
+ long getGenerationStamp() {
|
|
|
|
+ return generationStamp.getStamp();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * deletes the lease for the specified file
|
|
|
|
+ */
|
|
|
|
+ void removeLease(String src, String holder) throws IOException {
|
|
|
|
+ synchronized (sortedLeases) {
|
|
|
|
+ Lease lease = getLease(holder);
|
|
|
|
+ if (lease != null) {
|
|
|
|
+ lease.completedCreate(src);
|
|
|
|
+ if (!lease.hasLocks()) {
|
|
|
|
+ removeLease(holder);
|
|
|
|
+ sortedLeases.remove(lease);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Adds (or re-adds) the lease for the specified file.
|
|
|
|
+ */
|
|
|
|
+ void addLease(String src, String holder) throws IOException {
|
|
|
|
+ synchronized (sortedLeases) {
|
|
|
|
+ Lease lease = getLease(holder);
|
|
|
|
+ if (lease == null) {
|
|
|
|
+ lease = new Lease(holder);
|
|
|
|
+ putLease(holder, lease);
|
|
|
|
+ sortedLeases.add(lease);
|
|
|
|
+ } else {
|
|
|
|
+ sortedLeases.remove(lease);
|
|
|
|
+ lease.renew();
|
|
|
|
+ sortedLeases.add(lease);
|
|
|
|
+ }
|
|
|
|
+ lease.startedCreate(src);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Returns the number of leases currently in the system
|
|
|
|
+ */
|
|
|
|
+ int countLease() {
|
|
|
|
+ synchronized (sortedLeases) {
|
|
|
|
+ return sortedLeases.size();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Serializes leases
|
|
|
|
+ */
|
|
|
|
+ void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
|
|
|
|
+ synchronized (sortedLeases) {
|
|
|
|
+ out.writeInt(sortedLeases.size()); // write the size
|
|
|
|
+ for (Iterator<Lease> it = sortedLeases.iterator(); it.hasNext();) {
|
|
|
|
+ Lease lease = it.next();
|
|
|
|
+ Collection<StringBytesWritable> files = lease.getPaths();
|
|
|
|
+ for (Iterator<StringBytesWritable> i = files.iterator(); i.hasNext();){
|
|
|
|
+ String path = i.next().getString();
|
|
|
|
+
|
|
|
|
+ // verify that path exists in namespace
|
|
|
|
+ INode node = dir.getFileINode(path);
|
|
|
|
+ if (node == null) {
|
|
|
|
+ throw new IOException("saveLeases found path " + path +
|
|
|
|
+ " but no matching entry in namespace.");
|
|
|
|
+ }
|
|
|
|
+ if (!node.isUnderConstruction()) {
|
|
|
|
+ throw new IOException("saveLeases found path " + path +
|
|
|
|
+ " but is not under construction.");
|
|
|
|
+ }
|
|
|
|
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
|
|
|
|
+ FSImage.writeINodeUnderConstruction(out, cons);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|