|
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.BlockType;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
@@ -119,7 +120,7 @@ class FSDirWriteFileOp {
|
|
FSNamesystem fsn = fsd.getFSNamesystem();
|
|
FSNamesystem fsn = fsd.getFSNamesystem();
|
|
final INodeFile file = fsn.checkLease(iip, holder, fileId);
|
|
final INodeFile file = fsn.checkLease(iip, holder, fileId);
|
|
Preconditions.checkState(file.isUnderConstruction());
|
|
Preconditions.checkState(file.isUnderConstruction());
|
|
- if (file.isStriped()) {
|
|
|
|
|
|
+ if (file.getBlockType() == BlockType.STRIPED) {
|
|
return; // do not abandon block for striped file
|
|
return; // do not abandon block for striped file
|
|
}
|
|
}
|
|
|
|
|
|
@@ -162,7 +163,7 @@ class FSDirWriteFileOp {
|
|
final short numTargets;
|
|
final short numTargets;
|
|
final byte storagePolicyID;
|
|
final byte storagePolicyID;
|
|
String clientMachine;
|
|
String clientMachine;
|
|
- final boolean isStriped;
|
|
|
|
|
|
+ final BlockType blockType;
|
|
|
|
|
|
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
|
|
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
|
|
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
|
|
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
|
|
@@ -186,9 +187,9 @@ class FSDirWriteFileOp {
|
|
blockSize = pendingFile.getPreferredBlockSize();
|
|
blockSize = pendingFile.getPreferredBlockSize();
|
|
clientMachine = pendingFile.getFileUnderConstructionFeature()
|
|
clientMachine = pendingFile.getFileUnderConstructionFeature()
|
|
.getClientMachine();
|
|
.getClientMachine();
|
|
- isStriped = pendingFile.isStriped();
|
|
|
|
|
|
+ blockType = pendingFile.getBlockType();
|
|
ErasureCodingPolicy ecPolicy = null;
|
|
ErasureCodingPolicy ecPolicy = null;
|
|
- if (isStriped) {
|
|
|
|
|
|
+ if (blockType == BlockType.STRIPED) {
|
|
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(fsn, src);
|
|
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(fsn, src);
|
|
numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()
|
|
numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()
|
|
+ ecPolicy.getSchema().getNumParityUnits());
|
|
+ ecPolicy.getSchema().getNumParityUnits());
|
|
@@ -197,7 +198,7 @@ class FSDirWriteFileOp {
|
|
}
|
|
}
|
|
storagePolicyID = pendingFile.getStoragePolicyID();
|
|
storagePolicyID = pendingFile.getStoragePolicyID();
|
|
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
|
|
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
|
|
- clientMachine, isStriped);
|
|
|
|
|
|
+ clientMachine, blockType);
|
|
}
|
|
}
|
|
|
|
|
|
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
|
|
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
|
|
@@ -237,7 +238,7 @@ class FSDirWriteFileOp {
|
|
// add new chosen targets to already allocated block and return
|
|
// add new chosen targets to already allocated block and return
|
|
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
|
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
|
|
lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(
|
|
lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(
|
|
- lastBlockInFile, targets, pendingFile.isStriped());
|
|
|
|
|
|
+ lastBlockInFile, targets, pendingFile.getBlockType());
|
|
offset = pendingFile.computeFileSize();
|
|
offset = pendingFile.computeFileSize();
|
|
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
|
|
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
|
|
}
|
|
}
|
|
@@ -248,11 +249,11 @@ class FSDirWriteFileOp {
|
|
ExtendedBlock.getLocalBlock(previous));
|
|
ExtendedBlock.getLocalBlock(previous));
|
|
|
|
|
|
// allocate new block, record block locations in INode.
|
|
// allocate new block, record block locations in INode.
|
|
- final boolean isStriped = pendingFile.isStriped();
|
|
|
|
|
|
+ final BlockType blockType = pendingFile.getBlockType();
|
|
// allocate new block, record block locations in INode.
|
|
// allocate new block, record block locations in INode.
|
|
- Block newBlock = fsn.createNewBlock(isStriped);
|
|
|
|
|
|
+ Block newBlock = fsn.createNewBlock(blockType);
|
|
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
|
|
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
|
|
- saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, isStriped);
|
|
|
|
|
|
+ saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, blockType);
|
|
|
|
|
|
persistNewBlock(fsn, src, pendingFile);
|
|
persistNewBlock(fsn, src, pendingFile);
|
|
offset = pendingFile.computeFileSize();
|
|
offset = pendingFile.computeFileSize();
|
|
@@ -282,7 +283,7 @@ class FSDirWriteFileOp {
|
|
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
|
|
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
|
|
excludedNodesSet, r.blockSize,
|
|
excludedNodesSet, r.blockSize,
|
|
favoredNodesList, r.storagePolicyID,
|
|
favoredNodesList, r.storagePolicyID,
|
|
- r.isStriped, flags);
|
|
|
|
|
|
+ r.blockType, flags);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -420,14 +421,16 @@ class FSDirWriteFileOp {
|
|
if (ecPolicy != null) {
|
|
if (ecPolicy != null) {
|
|
replication = ecPolicy.getId();
|
|
replication = ecPolicy.getId();
|
|
}
|
|
}
|
|
|
|
+ final BlockType blockType = ecPolicy != null?
|
|
|
|
+ BlockType.STRIPED : BlockType.CONTIGUOUS;
|
|
if (underConstruction) {
|
|
if (underConstruction) {
|
|
newNode = newINodeFile(id, permissions, modificationTime,
|
|
newNode = newINodeFile(id, permissions, modificationTime,
|
|
modificationTime, replication, preferredBlockSize, storagePolicyId,
|
|
modificationTime, replication, preferredBlockSize, storagePolicyId,
|
|
- ecPolicy != null);
|
|
|
|
|
|
+ blockType);
|
|
newNode.toUnderConstruction(clientName, clientMachine);
|
|
newNode.toUnderConstruction(clientName, clientMachine);
|
|
} else {
|
|
} else {
|
|
newNode = newINodeFile(id, permissions, modificationTime, atime,
|
|
newNode = newINodeFile(id, permissions, modificationTime, atime,
|
|
- replication, preferredBlockSize, storagePolicyId, ecPolicy != null);
|
|
|
|
|
|
+ replication, preferredBlockSize, storagePolicyId, blockType);
|
|
}
|
|
}
|
|
newNode.setLocalName(localName);
|
|
newNode.setLocalName(localName);
|
|
INodesInPath iip = fsd.addINode(existing, newNode,
|
|
INodesInPath iip = fsd.addINode(existing, newNode,
|
|
@@ -459,7 +462,7 @@ class FSDirWriteFileOp {
|
|
*/
|
|
*/
|
|
private static BlockInfo addBlock(FSDirectory fsd, String path,
|
|
private static BlockInfo addBlock(FSDirectory fsd, String path,
|
|
INodesInPath inodesInPath, Block block, DatanodeStorageInfo[] targets,
|
|
INodesInPath inodesInPath, Block block, DatanodeStorageInfo[] targets,
|
|
- boolean isStriped) throws IOException {
|
|
|
|
|
|
+ BlockType blockType) throws IOException {
|
|
fsd.writeLock();
|
|
fsd.writeLock();
|
|
try {
|
|
try {
|
|
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
|
|
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
|
|
@@ -467,7 +470,7 @@ class FSDirWriteFileOp {
|
|
|
|
|
|
// associate new last block for the file
|
|
// associate new last block for the file
|
|
final BlockInfo blockInfo;
|
|
final BlockInfo blockInfo;
|
|
- if (isStriped) {
|
|
|
|
|
|
+ if (blockType == BlockType.STRIPED) {
|
|
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
|
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
|
|
fsd.getFSNamesystem(), inodesInPath);
|
|
fsd.getFSNamesystem(), inodesInPath);
|
|
short numDataUnits = (short) ecPolicy.getNumDataUnits();
|
|
short numDataUnits = (short) ecPolicy.getNumDataUnits();
|
|
@@ -525,8 +528,10 @@ class FSDirWriteFileOp {
|
|
if (ecPolicy != null) {
|
|
if (ecPolicy != null) {
|
|
replication = ecPolicy.getId();
|
|
replication = ecPolicy.getId();
|
|
}
|
|
}
|
|
|
|
+ final BlockType blockType = ecPolicy != null?
|
|
|
|
+ BlockType.STRIPED : BlockType.CONTIGUOUS;
|
|
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
|
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
|
- modTime, modTime, replication, preferredBlockSize, ecPolicy != null);
|
|
|
|
|
|
+ modTime, modTime, replication, preferredBlockSize, blockType);
|
|
newNode.setLocalName(localName);
|
|
newNode.setLocalName(localName);
|
|
newNode.toUnderConstruction(clientName, clientMachine);
|
|
newNode.toUnderConstruction(clientName, clientMachine);
|
|
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
|
|
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
|
|
@@ -698,17 +703,17 @@ class FSDirWriteFileOp {
|
|
private static INodeFile newINodeFile(
|
|
private static INodeFile newINodeFile(
|
|
long id, PermissionStatus permissions, long mtime, long atime,
|
|
long id, PermissionStatus permissions, long mtime, long atime,
|
|
short replication, long preferredBlockSize, byte storagePolicyId,
|
|
short replication, long preferredBlockSize, byte storagePolicyId,
|
|
- boolean isStriped) {
|
|
|
|
|
|
+ BlockType blockType) {
|
|
return new INodeFile(id, null, permissions, mtime, atime,
|
|
return new INodeFile(id, null, permissions, mtime, atime,
|
|
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
|
|
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
|
|
- storagePolicyId, isStriped);
|
|
|
|
|
|
+ storagePolicyId, blockType);
|
|
}
|
|
}
|
|
|
|
|
|
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
|
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
|
long mtime, long atime, short replication, long preferredBlockSize,
|
|
long mtime, long atime, short replication, long preferredBlockSize,
|
|
- boolean isStriped) {
|
|
|
|
|
|
+ BlockType blockType) {
|
|
return newINodeFile(id, permissions, mtime, atime, replication,
|
|
return newINodeFile(id, permissions, mtime, atime, replication,
|
|
- preferredBlockSize, (byte)0, isStriped);
|
|
|
|
|
|
+ preferredBlockSize, (byte)0, blockType);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -738,10 +743,10 @@ class FSDirWriteFileOp {
|
|
*/
|
|
*/
|
|
private static void saveAllocatedBlock(FSNamesystem fsn, String src,
|
|
private static void saveAllocatedBlock(FSNamesystem fsn, String src,
|
|
INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets,
|
|
INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets,
|
|
- boolean isStriped) throws IOException {
|
|
|
|
|
|
+ BlockType blockType) throws IOException {
|
|
assert fsn.hasWriteLock();
|
|
assert fsn.hasWriteLock();
|
|
BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, targets,
|
|
BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, targets,
|
|
- isStriped);
|
|
|
|
|
|
+ blockType);
|
|
logAllocatedBlock(src, b);
|
|
logAllocatedBlock(src, b);
|
|
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
|
DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
|
}
|
|
}
|
|
@@ -808,16 +813,16 @@ class FSDirWriteFileOp {
|
|
final int numTargets;
|
|
final int numTargets;
|
|
final byte storagePolicyID;
|
|
final byte storagePolicyID;
|
|
final String clientMachine;
|
|
final String clientMachine;
|
|
- final boolean isStriped;
|
|
|
|
|
|
+ final BlockType blockType;
|
|
|
|
|
|
ValidateAddBlockResult(
|
|
ValidateAddBlockResult(
|
|
long blockSize, int numTargets, byte storagePolicyID,
|
|
long blockSize, int numTargets, byte storagePolicyID,
|
|
- String clientMachine, boolean isStriped) {
|
|
|
|
|
|
+ String clientMachine, BlockType blockType) {
|
|
this.blockSize = blockSize;
|
|
this.blockSize = blockSize;
|
|
this.numTargets = numTargets;
|
|
this.numTargets = numTargets;
|
|
this.storagePolicyID = storagePolicyID;
|
|
this.storagePolicyID = storagePolicyID;
|
|
this.clientMachine = clientMachine;
|
|
this.clientMachine = clientMachine;
|
|
- this.isStriped = isStriped;
|
|
|
|
|
|
+ this.blockType = blockType;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|