|
@@ -233,8 +233,9 @@ class FSNamesystem implements FSConstants {
|
|
|
* writes data. Subsequent items in the list must be provided in
|
|
|
* the connection to the first datanode.
|
|
|
* @return Return an array that consists of the block, plus a set
|
|
|
- * of machines, or null if src is invalid for creation (based on
|
|
|
- * {@link FSDirectory#isValidToCreate(UTF8)}.
|
|
|
+ * of machines
|
|
|
+ * @throws IOException if the filename is invalid
|
|
|
+ * {@link FSDirectory#isValidToCreate(UTF8)}.
|
|
|
*/
|
|
|
public synchronized Object[] startFile( UTF8 src,
|
|
|
UTF8 holder,
|
|
@@ -242,11 +243,12 @@ class FSNamesystem implements FSConstants {
|
|
|
boolean overwrite,
|
|
|
short replication
|
|
|
) throws IOException {
|
|
|
+ try {
|
|
|
if (pendingCreates.get(src) != null) {
|
|
|
- LOG.warning("Cannot create file " + src + " for " + holder +
|
|
|
+ String msg = "Cannot create file " + src + " for " + holder +
|
|
|
" on " + clientMachine +
|
|
|
- " because pendingCreates is non-null.");
|
|
|
- return null;
|
|
|
+ " because pendingCreates is non-null.";
|
|
|
+ throw new NameNode.AlreadyBeingCreatedException(msg);
|
|
|
}
|
|
|
|
|
|
if( replication > maxReplication )
|
|
@@ -261,27 +263,28 @@ class FSNamesystem implements FSConstants {
|
|
|
+ "Requested replication " + replication
|
|
|
+ " is less than the required minimum " + minReplication );
|
|
|
|
|
|
- boolean fileValid = dir.isValidToCreate(src);
|
|
|
- if (overwrite && ! fileValid) {
|
|
|
+ if (!dir.isValidToCreate(src)) {
|
|
|
+ if (overwrite) {
|
|
|
delete(src);
|
|
|
- fileValid = true;
|
|
|
- }
|
|
|
-
|
|
|
- if ( ! fileValid) {
|
|
|
- LOG.warning("Cannot start file because it is invalid. src=" + src);
|
|
|
- return null;
|
|
|
+ } else {
|
|
|
+ throw new IOException("Can't create file " + src +
|
|
|
+ ", because the filename is invalid.");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Get the array of replication targets
|
|
|
DatanodeInfo targets[] = chooseTargets(replication, null, clientMachine);
|
|
|
if (targets.length < this.minReplication) {
|
|
|
- LOG.warning("Target-length is " + targets.length +
|
|
|
- ", below MIN_REPLICATION (" + this.minReplication+ ")");
|
|
|
- return null;
|
|
|
- }
|
|
|
+ throw new IOException("Target-length is " + targets.length +
|
|
|
+ ", below MIN_REPLICATION (" +
|
|
|
+ minReplication+ ")");
|
|
|
+ }
|
|
|
|
|
|
// Reserve space for this pending file
|
|
|
- pendingCreates.put(src, new FileUnderConstruction( replication ));
|
|
|
+ pendingCreates.put(src,
|
|
|
+ new FileUnderConstruction(replication,
|
|
|
+ holder,
|
|
|
+ clientMachine));
|
|
|
LOG.fine("Adding " + src + " to pendingCreates for " + holder);
|
|
|
synchronized (leases) {
|
|
|
Lease lease = (Lease) leases.get(holder);
|
|
@@ -302,6 +305,10 @@ class FSNamesystem implements FSConstants {
|
|
|
results[0] = allocateBlock(src);
|
|
|
results[1] = targets;
|
|
|
return results;
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.warning(ie.getMessage());
|
|
|
+ throw ie;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -315,29 +322,42 @@ class FSNamesystem implements FSConstants {
|
|
|
* are replicated. Will return an empty 2-elt array if we want the
|
|
|
* client to "try again later".
|
|
|
*/
|
|
|
- public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {
|
|
|
- Object results[] = null;
|
|
|
+ public synchronized Object[] getAdditionalBlock(UTF8 src,
|
|
|
+ UTF8 clientName
|
|
|
+ ) throws IOException {
|
|
|
FileUnderConstruction pendingFile =
|
|
|
(FileUnderConstruction) pendingCreates.get(src);
|
|
|
- if (dir.getFile(src) == null && pendingFile != null) {
|
|
|
- results = new Object[2];
|
|
|
-
|
|
|
- //
|
|
|
- // If we fail this, bad things happen!
|
|
|
- //
|
|
|
- if (checkFileProgress(src)) {
|
|
|
- // Get the array of replication targets
|
|
|
- DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(), null, clientMachine);
|
|
|
- if (targets.length < this.minReplication) {
|
|
|
- return null;
|
|
|
- }
|
|
|
+ // make sure that we still have the lease on this file
|
|
|
+ if (pendingFile == null) {
|
|
|
+ throw new NameNode.LeaseExpiredException("No lease on " + src);
|
|
|
+ }
|
|
|
+ if (!pendingFile.getClientName().equals(clientName)) {
|
|
|
+ throw new NameNode.LeaseExpiredException("Lease mismatch on " + src +
|
|
|
+ " owned by " + pendingFile.getClientName() +
|
|
|
+ " and appended by " + clientName);
|
|
|
+ }
|
|
|
+ if (dir.getFile(src) != null) {
|
|
|
+ throw new IOException("File " + src + " created during write");
|
|
|
+ }
|
|
|
|
|
|
- // Create next block
|
|
|
- results[0] = allocateBlock(src);
|
|
|
- results[1] = targets;
|
|
|
- }
|
|
|
+ //
|
|
|
+ // If we fail this, bad things happen!
|
|
|
+ //
|
|
|
+ if (!checkFileProgress(src)) {
|
|
|
+ throw new NameNode.NotReplicatedYetException("Not replicated yet");
|
|
|
}
|
|
|
- return results;
|
|
|
+
|
|
|
+ // Get the array of replication targets
|
|
|
+ DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(),
|
|
|
+ null, pendingFile.getClientMachine());
|
|
|
+ if (targets.length < this.minReplication) {
|
|
|
+ throw new IOException("File " + src + " could only be replicated to " +
|
|
|
+ targets.length + " nodes, instead of " +
|
|
|
+ minReplication);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create next block
|
|
|
+ return new Object[]{allocateBlock(src), targets};
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -347,8 +367,10 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
// Remove the block from the pending creates list
|
|
|
//
|
|
|
- Vector pendingVector = (Vector) pendingCreates.get(src);
|
|
|
- if (pendingVector != null) {
|
|
|
+ FileUnderConstruction pendingFile =
|
|
|
+ (FileUnderConstruction) pendingCreates.get(src);
|
|
|
+ if (pendingFile != null) {
|
|
|
+ Vector pendingVector = pendingFile.getBlocks();
|
|
|
for (Iterator it = pendingVector.iterator(); it.hasNext(); ) {
|
|
|
Block cur = (Block) it.next();
|
|
|
if (cur.compareTo(b) == 0) {
|
|
@@ -368,7 +390,9 @@ class FSNamesystem implements FSConstants {
|
|
|
UTF8 holder
|
|
|
) throws IOException {
|
|
|
LOG.info("abandoning file in progress on " + src.toString());
|
|
|
- internalReleaseCreate(src, holder);
|
|
|
+ synchronized (leases) {
|
|
|
+ internalReleaseCreate(src, holder);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -387,9 +411,11 @@ class FSNamesystem implements FSConstants {
|
|
|
return STILL_WAITING;
|
|
|
}
|
|
|
|
|
|
- FileUnderConstruction pendingFile = (FileUnderConstruction) pendingCreates.get(src);
|
|
|
- int nrBlocks = pendingFile.size();
|
|
|
- Block pendingBlocks[] = (Block[]) pendingFile.toArray(new Block[nrBlocks]);
|
|
|
+ FileUnderConstruction pendingFile =
|
|
|
+ (FileUnderConstruction) pendingCreates.get(src);
|
|
|
+ Vector blocks = pendingFile.getBlocks();
|
|
|
+ int nrBlocks = blocks.size();
|
|
|
+ Block pendingBlocks[] = (Block[]) blocks.toArray(new Block[nrBlocks]);
|
|
|
|
|
|
//
|
|
|
// We have the pending blocks, but they won't have
|
|
@@ -473,7 +499,7 @@ class FSNamesystem implements FSConstants {
|
|
|
Block b = new Block();
|
|
|
FileUnderConstruction v =
|
|
|
(FileUnderConstruction) pendingCreates.get(src);
|
|
|
- v.add(b);
|
|
|
+ v.getBlocks().add(b);
|
|
|
pendingCreateBlocks.add(b);
|
|
|
return b;
|
|
|
}
|
|
@@ -486,7 +512,7 @@ class FSNamesystem implements FSConstants {
|
|
|
FileUnderConstruction v =
|
|
|
(FileUnderConstruction) pendingCreates.get(src);
|
|
|
|
|
|
- for (Iterator it = v.iterator(); it.hasNext(); ) {
|
|
|
+ for (Iterator it = v.getBlocks().iterator(); it.hasNext(); ) {
|
|
|
Block b = (Block) it.next();
|
|
|
TreeSet containingNodes = (TreeSet) blocksMap.get(b);
|
|
|
if (containingNodes == null || containingNodes.size() < this.minReplication) {
|
|
@@ -639,8 +665,8 @@ class FSNamesystem implements FSConstants {
|
|
|
class Lease implements Comparable {
|
|
|
public UTF8 holder;
|
|
|
public long lastUpdate;
|
|
|
- TreeSet locks = new TreeSet();
|
|
|
- TreeSet creates = new TreeSet();
|
|
|
+ private TreeSet locks = new TreeSet();
|
|
|
+ private TreeSet creates = new TreeSet();
|
|
|
|
|
|
public Lease(UTF8 holder) {
|
|
|
this.holder = holder;
|
|
@@ -803,7 +829,7 @@ class FSNamesystem implements FSConstants {
|
|
|
if (v != null) {
|
|
|
LOG.info("Removing " + src + " from pendingCreates for " +
|
|
|
holder + " (failure)");
|
|
|
- for (Iterator it2 = v.iterator(); it2.hasNext(); ) {
|
|
|
+ for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext(); ) {
|
|
|
Block b = (Block) it2.next();
|
|
|
pendingCreateBlocks.remove(b);
|
|
|
}
|
|
@@ -1416,15 +1442,35 @@ class FSNamesystem implements FSConstants {
|
|
|
*
|
|
|
* @author shv
|
|
|
*/
|
|
|
- private class FileUnderConstruction extends Vector {
|
|
|
+ private class FileUnderConstruction {
|
|
|
private short blockReplication; // file replication
|
|
|
+ private Vector blocks;
|
|
|
+ private UTF8 clientName; // lease holder
|
|
|
+ private UTF8 clientMachine;
|
|
|
|
|
|
- FileUnderConstruction( short replication ) throws IOException {
|
|
|
+ FileUnderConstruction(short replication,
|
|
|
+ UTF8 clientName,
|
|
|
+ UTF8 clientMachine) throws IOException {
|
|
|
this.blockReplication = replication;
|
|
|
+ this.blocks = new Vector();
|
|
|
+ this.clientName = clientName;
|
|
|
+ this.clientMachine = clientMachine;
|
|
|
}
|
|
|
|
|
|
public short getReplication() {
|
|
|
return this.blockReplication;
|
|
|
}
|
|
|
+
|
|
|
+ public Vector getBlocks() {
|
|
|
+ return blocks;
|
|
|
+ }
|
|
|
+
|
|
|
+ public UTF8 getClientName() {
|
|
|
+ return clientName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public UTF8 getClientMachine() {
|
|
|
+ return clientMachine;
|
|
|
+ }
|
|
|
}
|
|
|
}
|