|
@@ -18,6 +18,7 @@
|
|
|
package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
+import com.google.protobuf.ByteString;
|
|
|
import org.apache.commons.io.Charsets;
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
|
import org.apache.hadoop.crypto.CipherSuite;
|
|
@@ -26,6 +27,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
import org.apache.hadoop.fs.FileEncryptionInfo;
|
|
|
+import org.apache.hadoop.fs.InvalidPathException;
|
|
|
import org.apache.hadoop.fs.XAttr;
|
|
|
import org.apache.hadoop.fs.permission.AclEntry;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
@@ -47,13 +49,13 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderCon
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.util.ChunkedArrayList;
|
|
|
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
@@ -335,105 +337,147 @@ class FSDirWriteFileOp {
|
|
|
|
|
|
boolean isRawPath = FSDirectory.isReservedRawName(src);
|
|
|
FSDirectory fsd = fsn.getFSDirectory();
|
|
|
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
|
|
|
- src = fsd.resolvePath(pc, src, pathComponents);
|
|
|
- INodesInPath iip = fsd.getINodesInPath4Write(src);
|
|
|
+ final StringMap ugid = fsd.ugid();
|
|
|
|
|
|
- // Verify that the destination does not exist as a directory already.
|
|
|
- final INode inode = iip.getLastINode();
|
|
|
- if (inode != null && inode.isDirectory()) {
|
|
|
- throw new FileAlreadyExistsException(src +
|
|
|
- " already exists as a directory");
|
|
|
- }
|
|
|
+ try (RWTransaction tx = fsd.newRWTransaction().begin()) {
|
|
|
+ Resolver.Result paths = Resolver.resolve(tx, src);
|
|
|
+ if (paths.invalidPath()) {
|
|
|
+ throw new InvalidPathException(src);
|
|
|
+ }
|
|
|
+
|
|
|
+ final FlatINodesInPath iip = paths.inodesInPath();
|
|
|
+ // Verify that the destination does not exist as a directory already.
|
|
|
+ if (paths.ok()) {
|
|
|
+ FlatINode inode = paths.inodesInPath().getLastINode();
|
|
|
+ if (inode.isDirectory()) {
|
|
|
+ throw new FileAlreadyExistsException(src +
|
|
|
+ " already exists as a directory");
|
|
|
+ }
|
|
|
|
|
|
- final INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
|
|
- if (fsd.isPermissionEnabled()) {
|
|
|
- if (overwrite && myFile != null) {
|
|
|
- fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
|
|
+ if (fsd.isPermissionEnabled()) {
|
|
|
+ if (overwrite) {
|
|
|
+ fsd.checkPathAccess(pc, iip, FsAction.WRITE);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ if (fsd.isPermissionEnabled()) {
|
|
|
/*
|
|
|
* To overwrite existing file, need to check 'w' permission
|
|
|
* of parent (equals to ancestor in this case)
|
|
|
*/
|
|
|
- fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
|
|
|
- }
|
|
|
-
|
|
|
- if (!createParent) {
|
|
|
- fsd.verifyParentDir(iip, src);
|
|
|
- }
|
|
|
-
|
|
|
- if (myFile == null && !create) {
|
|
|
- throw new FileNotFoundException("Can't overwrite non-existent " +
|
|
|
- src + " for client " + clientMachine);
|
|
|
- }
|
|
|
-
|
|
|
- FileEncryptionInfo feInfo = null;
|
|
|
+ fsd.checkAncestorAccess(pc, paths, FsAction.WRITE);
|
|
|
+ }
|
|
|
|
|
|
- final EncryptionZone zone = fsd.getEZForPath(iip);
|
|
|
- if (zone != null) {
|
|
|
- // The path is now within an EZ, but we're missing encryption parameters
|
|
|
- if (suite == null || edek == null) {
|
|
|
- throw new RetryStartFileException();
|
|
|
+ if (!createParent && FlatNSUtil.hasNextLevelInPath(paths.src, paths
|
|
|
+ .offset)) {
|
|
|
+ throw new FileNotFoundException(paths.src.substring(0, paths.offset));
|
|
|
}
|
|
|
- // Path is within an EZ and we have provided encryption parameters.
|
|
|
- // Make sure that the generated EDEK matches the settings of the EZ.
|
|
|
- final String ezKeyName = zone.getKeyName();
|
|
|
- if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
|
|
|
- throw new RetryStartFileException();
|
|
|
+
|
|
|
+ if (paths.notFound() && !create) {
|
|
|
+ throw new FileNotFoundException("Can't overwrite non-existent " +
|
|
|
+ src + " for client " + clientMachine);
|
|
|
}
|
|
|
- feInfo = new FileEncryptionInfo(suite, version,
|
|
|
- edek.getEncryptedKeyVersion().getMaterial(),
|
|
|
- edek.getEncryptedKeyIv(),
|
|
|
- ezKeyName, edek.getEncryptionKeyVersionName());
|
|
|
- }
|
|
|
|
|
|
- if (myFile != null) {
|
|
|
- if (overwrite) {
|
|
|
- // TODO
|
|
|
-// List<INode> toRemoveINodes = new ChunkedArrayList<>();
|
|
|
-// List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
|
|
|
-// long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
|
|
|
-// toRemoveINodes, toRemoveUCFiles, now());
|
|
|
-// if (ret >= 0) {
|
|
|
-// iip = INodesInPath.replace(iip, iip.length() - 1, null);
|
|
|
-// FSDirDeleteOp.incrDeletedFileCount(ret);
|
|
|
-// fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
|
|
|
+ // TODO: Handle encryption
|
|
|
+ FileEncryptionInfo feInfo = null;
|
|
|
+
|
|
|
+// final EncryptionZone zone = fsd.getEZForPath(iip);
|
|
|
+// if (zone != null) {
|
|
|
+// // The path is now within an EZ, but we're missing encryption parameters
|
|
|
+// if (suite == null || edek == null) {
|
|
|
+// throw new RetryStartFileException();
|
|
|
// }
|
|
|
- } else {
|
|
|
- // If lease soft limit time is expired, recover the lease
|
|
|
- fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
|
|
|
- src, holder, clientMachine, false);
|
|
|
- throw new FileAlreadyExistsException(src + " for client " +
|
|
|
- clientMachine + " already exists");
|
|
|
+// // Path is within an EZ and we have provided encryption parameters.
|
|
|
+// // Make sure that the generated EDEK matches the settings of the EZ.
|
|
|
+// final String ezKeyName = zone.getKeyName();
|
|
|
+// if (!ezKeyName.equals(edek.getEncryptionKeyName())) {
|
|
|
+// throw new RetryStartFileException();
|
|
|
+// }
|
|
|
+// feInfo = new FileEncryptionInfo(suite, version,
|
|
|
+// edek.getEncryptedKeyVersion().getMaterial(),
|
|
|
+// edek.getEncryptedKeyIv(),
|
|
|
+// ezKeyName, edek.getEncryptionKeyVersionName());
|
|
|
+// }
|
|
|
+
|
|
|
+ if (paths.ok()) {
|
|
|
+ if (overwrite) {
|
|
|
+ // TODO
|
|
|
+ List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
|
|
|
+ long ret = FSDirDeleteOp.delete(tx, paths, toRemoveBlocks,
|
|
|
+ toRemoveUCFiles, now());
|
|
|
+ if (ret >= 0) {
|
|
|
+ FSDirDeleteOp.incrDeletedFileCount(ret);
|
|
|
+ fsn.removeLeases(toRemoveUCFiles);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // TODO
|
|
|
+ // If lease soft limit time is expired, recover the lease
|
|
|
+// fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
|
|
|
+// src, holder, clientMachine, false);
|
|
|
+ throw new FileAlreadyExistsException(src + " for client " +
|
|
|
+ clientMachine + " already exists");
|
|
|
+ }
|
|
|
}
|
|
|
+ fsn.checkFsObjectLimit();
|
|
|
+ paths = Resolver.resolve(tx, src);
|
|
|
+ Map.Entry<FlatINodesInPath, String> parent = FSDirMkdirOp
|
|
|
+ .createAncestorDirectories(tx, fsd, paths, permissions);
|
|
|
+ long newId = tx.allocateNewInodeId();
|
|
|
+ FlatINodeFileFeature.Builder fileFeatureBuilder = new FlatINodeFileFeature
|
|
|
+ .Builder()
|
|
|
+ .replication(replication)
|
|
|
+ .blockSize(blockSize)
|
|
|
+ .inConstruction(true)
|
|
|
+ .clientName(holder)
|
|
|
+ .clientMachine(clientMachine);
|
|
|
+
|
|
|
+ setNewINodeStoragePolicy(fsn.getBlockManager(), fileFeatureBuilder,
|
|
|
+ isLazyPersist);
|
|
|
+
|
|
|
+ int userId = tx.getStringId(permissions.getUserName());
|
|
|
+ FlatINode parentINode = parent.getKey().getLastINode();
|
|
|
+ int groupId = permissions.getGroupName() == null
|
|
|
+ ? parentINode.groupId()
|
|
|
+ : tx.getStringId(permissions.getGroupName());
|
|
|
+
|
|
|
+ FlatINodeFileFeature fileFeature = FlatINodeFileFeature.wrap(
|
|
|
+ fileFeatureBuilder.build());
|
|
|
+ ByteString b = new FlatINode.Builder()
|
|
|
+ .id(newId)
|
|
|
+ .type(FlatINode.Type.FILE)
|
|
|
+ .parentId(parentINode.id())
|
|
|
+ .mtime(now())
|
|
|
+ .userId(userId)
|
|
|
+ .groupId(groupId)
|
|
|
+ .permission(permissions.getPermission().toShort())
|
|
|
+ .addFeature(fileFeature)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ FlatINode newNode = FlatINode.wrap(b);
|
|
|
+ // TODO: check .reserved path, quotas and ACL
|
|
|
+ byte[] localName = parent.getValue().getBytes(Charsets.UTF_8);
|
|
|
+ tx.putINode(newId, b);
|
|
|
+ tx.putChild(parentINode.id(), ByteBuffer.wrap(localName), newId);
|
|
|
+
|
|
|
+ ByteString newParent = new FlatINode.Builder().mergeFrom(parentINode)
|
|
|
+ .mtime(now()).build();
|
|
|
+ tx.putINode(parentINode.id(), newParent);
|
|
|
+
|
|
|
+ fsn.leaseManager.addLease(holder, newId);
|
|
|
+// if (feInfo != null) {
|
|
|
+// fsd.setFileEncryptionInfo(src, feInfo);
|
|
|
+// newNode = fsd.getInode(newNode.getId()).asFile();
|
|
|
+// }
|
|
|
+ tx.logOpenFile(fsd.ugid(), src, newNode, overwrite, logRetryEntry);
|
|
|
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
+ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
|
|
|
+ src + " inode " + newId + " " + holder);
|
|
|
+ }
|
|
|
+ tx.commit();
|
|
|
+ return FSDirStatAndListingOp.createFileStatus(
|
|
|
+ tx, fsd, newNode, localName, fileFeature.storagePolicyId());
|
|
|
}
|
|
|
- fsn.checkFsObjectLimit();
|
|
|
- INodeFile newNode = null;
|
|
|
- Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
|
|
|
- .createAncestorDirectories(fsd, iip, permissions);
|
|
|
- if (parent != null) {
|
|
|
- iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions,
|
|
|
- replication, blockSize, holder, clientMachine);
|
|
|
- newNode = iip != null ? iip.getLastINode().asFile() : null;
|
|
|
- }
|
|
|
- if (newNode == null) {
|
|
|
- throw new IOException("Unable to add " + src + " to namespace");
|
|
|
- }
|
|
|
- fsn.leaseManager.addLease(
|
|
|
- newNode.getFileUnderConstructionFeature().getClientName(),
|
|
|
- newNode.getId());
|
|
|
- if (feInfo != null) {
|
|
|
- fsd.setFileEncryptionInfo(src, feInfo);
|
|
|
- newNode = fsd.getInode(newNode.getId()).asFile();
|
|
|
- }
|
|
|
- setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
|
|
|
- isLazyPersist);
|
|
|
- fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
|
|
|
- if (NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
|
|
|
- src + " inode " + newNode.getId() + " " + holder);
|
|
|
- }
|
|
|
- return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath, true);
|
|
|
}
|
|
|
|
|
|
static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
|
|
@@ -518,7 +562,7 @@ class FSDirWriteFileOp {
|
|
|
|
|
|
// check quota limits and updated space consumed
|
|
|
fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
|
|
|
- fileINode.getPreferredBlockReplication(), true);
|
|
|
+ fileINode.getFileReplication(), true);
|
|
|
|
|
|
// associate new last block for the file
|
|
|
BlockInfoContiguousUnderConstruction blockInfo =
|
|
@@ -542,41 +586,6 @@ class FSDirWriteFileOp {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Add the given filename to the fs.
|
|
|
- * @return the new INodesInPath instance that contains the new INode
|
|
|
- */
|
|
|
- private static INodesInPath addFile(
|
|
|
- FSDirectory fsd, INodesInPath existing, String localName,
|
|
|
- PermissionStatus permissions, short replication, long preferredBlockSize,
|
|
|
- String clientName, String clientMachine)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- long modTime = now();
|
|
|
- INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
|
|
- modTime, modTime, replication, preferredBlockSize);
|
|
|
- newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
|
|
|
- newNode.toUnderConstruction(clientName, clientMachine);
|
|
|
-
|
|
|
- INodesInPath newiip;
|
|
|
- fsd.writeLock();
|
|
|
- try {
|
|
|
- newiip = fsd.addINode(existing, newNode);
|
|
|
- } finally {
|
|
|
- fsd.writeUnlock();
|
|
|
- }
|
|
|
- if (newiip == null) {
|
|
|
- NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
|
|
|
- existing.getPath() + "/" + localName);
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
- NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
|
|
|
- }
|
|
|
- return newiip;
|
|
|
- }
|
|
|
-
|
|
|
private static FileState analyzeFileState(
|
|
|
FSNamesystem fsn, String src, long fileId, String clientName,
|
|
|
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
|
|
@@ -647,7 +656,7 @@ class FSDirWriteFileOp {
|
|
|
NameNode.stateChangeLog.debug(
|
|
|
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
|
|
|
" writing to a file with a complete previous block: src=" +
|
|
|
- src + " lastBlock=" + lastBlockInFile);
|
|
|
+ src + " lastBlock=" + lastBlockInFile);
|
|
|
}
|
|
|
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
|
|
|
if (lastBlockInFile.getNumBytes() != 0) {
|
|
@@ -672,7 +681,7 @@ class FSDirWriteFileOp {
|
|
|
// Case 3
|
|
|
throw new IOException("Cannot allocate block in " + src + ": " +
|
|
|
"passed 'previous' block " + previous + " does not match actual " +
|
|
|
- "last block in file " + lastBlockInFile);
|
|
|
+ "last block in file " + lastBlockInFile);
|
|
|
}
|
|
|
}
|
|
|
return new FileState(file, src, iip);
|
|
@@ -767,12 +776,6 @@ class FSDirWriteFileOp {
|
|
|
storagePolicyId);
|
|
|
}
|
|
|
|
|
|
- private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
|
|
- long mtime, long atime, short replication, long preferredBlockSize) {
|
|
|
- return newINodeFile(id, permissions, mtime, atime, replication,
|
|
|
- preferredBlockSize, (byte)0);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Persist the new block (the last block of the given file).
|
|
|
*/
|
|
@@ -809,8 +812,8 @@ class FSDirWriteFileOp {
|
|
|
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
|
|
}
|
|
|
|
|
|
- private static void setNewINodeStoragePolicy(BlockManager bm, INodeFile
|
|
|
- inode, INodesInPath iip, boolean isLazyPersist)
|
|
|
+ private static void setNewINodeStoragePolicy(
|
|
|
+ BlockManager bm, FlatINodeFileFeature.Builder file, boolean isLazyPersist)
|
|
|
throws IOException {
|
|
|
|
|
|
if (isLazyPersist) {
|
|
@@ -824,18 +827,17 @@ class FSDirWriteFileOp {
|
|
|
"The LAZY_PERSIST storage policy has been disabled " +
|
|
|
"by the administrator.");
|
|
|
}
|
|
|
- inode.setStoragePolicyID(lpPolicy.getId(),
|
|
|
- iip.getLatestSnapshotId());
|
|
|
+ file.storagePolicyId(lpPolicy.getId());
|
|
|
} else {
|
|
|
- BlockStoragePolicy effectivePolicy =
|
|
|
- bm.getStoragePolicy(inode.getStoragePolicyID());
|
|
|
-
|
|
|
- if (effectivePolicy != null &&
|
|
|
- effectivePolicy.isCopyOnCreateFile()) {
|
|
|
- // Copy effective policy from ancestor directory to current file.
|
|
|
- inode.setStoragePolicyID(effectivePolicy.getId(),
|
|
|
- iip.getLatestSnapshotId());
|
|
|
- }
|
|
|
+ // TODO: handle effective storage policy id
|
|
|
+// BlockStoragePolicy effectivePolicy =
|
|
|
+// bm.getStoragePolicy(parent.getLastINode().storagePolicyId());
|
|
|
+//
|
|
|
+// if (effectivePolicy != null &&
|
|
|
+// effectivePolicy.isCopyOnCreateFile()) {
|
|
|
+// // Copy effective policy from ancestor directory to current file.
|
|
|
+// file.storagePolicyId(effectivePolicy.getId());
|
|
|
+// }
|
|
|
}
|
|
|
}
|
|
|
|