|
@@ -228,6 +228,13 @@ class FSDirWriteFileOp {
|
|
|
// while chooseTarget() was executing.
|
|
|
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
|
|
|
INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
|
|
|
+
|
|
|
+ INode[] missing = new INode[]{iip.getLastINode()};
|
|
|
+ INodesInPath existing = iip.getParentINodesInPath();
|
|
|
+ FSDirectory fsd = fsn.getFSDirectory();
|
|
|
+ // switch the locks
|
|
|
+ fsd.getINodeMap().latchWriteLock(existing, missing);
|
|
|
+
|
|
|
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
|
|
|
previous, onRetryBlock);
|
|
|
final INodeFile pendingFile = fileState.inode;
|
|
@@ -392,8 +399,8 @@ class FSDirWriteFileOp {
|
|
|
}
|
|
|
fsn.checkFsObjectLimit();
|
|
|
INodeFile newNode = null;
|
|
|
- INodesInPath parent =
|
|
|
- FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
|
|
|
+ INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd,
|
|
|
+ iip.getParentINodesInPath(), permissions);
|
|
|
if (parent != null) {
|
|
|
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
|
|
|
replication, blockSize, holder, clientMachine, shouldReplicate,
|
|
@@ -541,41 +548,22 @@ class FSDirWriteFileOp {
|
|
|
FSDirectory fsd, INodesInPath existing, byte[] localName,
|
|
|
PermissionStatus permissions, short replication, long preferredBlockSize,
|
|
|
String clientName, String clientMachine, boolean shouldReplicate,
|
|
|
- String ecPolicyName, String storagePolicy) throws IOException {
|
|
|
+ String ecPolicyName, String storagePolicy)
|
|
|
+ throws IOException {
|
|
|
|
|
|
Preconditions.checkNotNull(existing);
|
|
|
long modTime = now();
|
|
|
INodesInPath newiip;
|
|
|
fsd.writeLock();
|
|
|
try {
|
|
|
- boolean isStriped = false;
|
|
|
- ErasureCodingPolicy ecPolicy = null;
|
|
|
- byte storagepolicyid = 0;
|
|
|
- if (storagePolicy != null && !storagePolicy.isEmpty()) {
|
|
|
- BlockStoragePolicy policy =
|
|
|
- fsd.getBlockManager().getStoragePolicy(storagePolicy);
|
|
|
- if (policy == null) {
|
|
|
- throw new HadoopIllegalArgumentException(
|
|
|
- "Cannot find a block policy with the name " + storagePolicy);
|
|
|
- }
|
|
|
- storagepolicyid = policy.getId();
|
|
|
- }
|
|
|
- if (!shouldReplicate) {
|
|
|
- ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
|
|
- fsd.getFSNamesystem(), ecPolicyName, existing);
|
|
|
- if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
|
|
|
- isStriped = true;
|
|
|
- }
|
|
|
- }
|
|
|
- final BlockType blockType = isStriped ?
|
|
|
- BlockType.STRIPED : BlockType.CONTIGUOUS;
|
|
|
- final Short replicationFactor = (!isStriped ? replication : null);
|
|
|
- final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
|
|
- INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
|
|
- modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
|
|
|
- storagepolicyid, blockType);
|
|
|
- newNode.setLocalName(localName);
|
|
|
- newNode.toUnderConstruction(clientName, clientMachine);
|
|
|
+ INodeFile newNode = createINodeFile(fsd, existing, localName,
|
|
|
+ permissions, replication, preferredBlockSize, clientName,
|
|
|
+ clientMachine, shouldReplicate, ecPolicyName, storagePolicy, modTime);
|
|
|
+
|
|
|
+ INode[] missing = new INode[] {newNode};
|
|
|
+ // switch the locks
|
|
|
+ fsd.getINodeMap().latchWriteLock(existing, missing);
|
|
|
+
|
|
|
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
|
|
|
} finally {
|
|
|
fsd.writeUnlock();
|
|
@@ -593,6 +581,42 @@ class FSDirWriteFileOp {
|
|
|
return newiip;
|
|
|
}
|
|
|
|
|
|
+ private static INodeFile createINodeFile(FSDirectory fsd,
|
|
|
+ INodesInPath existing, byte[] localName, PermissionStatus permissions,
|
|
|
+ short replication, long preferredBlockSize, String clientName,
|
|
|
+ String clientMachine, boolean shouldReplicate, String ecPolicyName,
|
|
|
+ String storagePolicy, long modTime) throws IOException {
|
|
|
+ boolean isStriped = false;
|
|
|
+ ErasureCodingPolicy ecPolicy = null;
|
|
|
+ byte storagepolicyid = 0;
|
|
|
+ if (storagePolicy != null && !storagePolicy.isEmpty()) {
|
|
|
+ BlockStoragePolicy policy =
|
|
|
+ fsd.getBlockManager().getStoragePolicy(storagePolicy);
|
|
|
+ if (policy == null) {
|
|
|
+ throw new HadoopIllegalArgumentException(
|
|
|
+ "Cannot find a block policy with the name " + storagePolicy);
|
|
|
+ }
|
|
|
+ storagepolicyid = policy.getId();
|
|
|
+ }
|
|
|
+ if (!shouldReplicate) {
|
|
|
+ ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
|
|
+ fsd.getFSNamesystem(), ecPolicyName, existing);
|
|
|
+ if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
|
|
|
+ isStriped = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ final BlockType blockType = isStriped ?
|
|
|
+ BlockType.STRIPED : BlockType.CONTIGUOUS;
|
|
|
+ final Short replicationFactor = (!isStriped ? replication : null);
|
|
|
+ final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
|
|
|
+ INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
|
|
+ modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
|
|
|
+ storagepolicyid, blockType);
|
|
|
+ newNode.setLocalName(localName);
|
|
|
+ newNode.toUnderConstruction(clientName, clientMachine);
|
|
|
+ return newNode;
|
|
|
+ }
|
|
|
+
|
|
|
private static FileState analyzeFileState(
|
|
|
FSNamesystem fsn, INodesInPath iip, long fileId, String clientName,
|
|
|
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
|
|
@@ -687,6 +711,14 @@ class FSDirWriteFileOp {
|
|
|
}
|
|
|
checkBlock(fsn, last);
|
|
|
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
|
|
|
+
|
|
|
+ assert (iip.getLastINode() instanceof INodeFile);
|
|
|
+ INode[] missing = new INode[] {iip.getLastINode()};
|
|
|
+ INodesInPath existing = iip.getParentINodesInPath();
|
|
|
+ // switch the locks
|
|
|
+ FSDirectory fsd = fsn.getFSDirectory();
|
|
|
+ fsd.getINodeMap().latchWriteLock(existing, missing);
|
|
|
+
|
|
|
return completeFileInternal(fsn, iip, holder,
|
|
|
ExtendedBlock.getLocalBlock(last), fileId);
|
|
|
}
|