|
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
|
@@ -767,7 +768,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized Block getStoredBlock(long blkid) throws IOException {
|
|
|
+ public synchronized Block getStoredBlock(String bpid, long blkid)
|
|
|
+ throws IOException {
|
|
|
+ // TODO:FEDERATION use extended block
|
|
|
File blockfile = findBlockFile(blkid);
|
|
|
if (blockfile == null) {
|
|
|
return null;
|
|
@@ -803,20 +806,21 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public boolean metaFileExists(Block b) throws IOException {
|
|
|
- return getMetaFile(b).exists();
|
|
|
+ public boolean metaFileExists(ExtendedBlock b) throws IOException {
|
|
|
+ // TODO:FEDERATION use ExtendedBlock
|
|
|
+ return getMetaFile(b.getLocalBlock()).exists();
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public long getMetaDataLength(Block b) throws IOException {
|
|
|
- File checksumFile = getMetaFile( b );
|
|
|
+ public long getMetaDataLength(ExtendedBlock b) throws IOException {
|
|
|
+ File checksumFile = getMetaFile(b.getLocalBlock());
|
|
|
return checksumFile.length();
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public MetaDataInputStream getMetaDataInputStream(Block b)
|
|
|
+ public MetaDataInputStream getMetaDataInputStream(ExtendedBlock b)
|
|
|
throws IOException {
|
|
|
- File checksumFile = getMetaFile( b );
|
|
|
+ File checksumFile = getMetaFile(b.getLocalBlock());
|
|
|
return new MetaDataInputStream(new FileInputStream(checksumFile),
|
|
|
checksumFile.length());
|
|
|
}
|
|
@@ -924,8 +928,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* Find the block's on-disk length
|
|
|
*/
|
|
|
@Override // FSDatasetInterface
|
|
|
- public long getLength(Block b) throws IOException {
|
|
|
- return getBlockFile(b).length();
|
|
|
+ public long getLength(ExtendedBlock b) throws IOException {
|
|
|
+ return getBlockFile(b.getLocalBlock()).length();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -943,14 +947,15 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized InputStream getBlockInputStream(Block b) throws IOException {
|
|
|
- return new FileInputStream(getBlockFile(b));
|
|
|
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b)
|
|
|
+ throws IOException {
|
|
|
+ return new FileInputStream(getBlockFile(b.getLocalBlock()));
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
|
|
|
-
|
|
|
- File blockFile = getBlockFile(b);
|
|
|
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
|
|
|
+ long seekOffset) throws IOException {
|
|
|
+ File blockFile = getBlockFile(b.getLocalBlock());
|
|
|
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
|
|
|
if (seekOffset > 0) {
|
|
|
blockInFile.seek(seekOffset);
|
|
@@ -977,10 +982,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* Returns handles to the block file and its metadata file
|
|
|
*/
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
|
|
|
+ public synchronized BlockInputStreams getTmpInputStreams(ExtendedBlock b,
|
|
|
long blkOffset, long ckoff) throws IOException {
|
|
|
-
|
|
|
- ReplicaInfo info = getReplicaInfo(b);
|
|
|
+ ReplicaInfo info = getReplicaInfo(b.getLocalBlock());
|
|
|
File blockFile = info.getBlockFile();
|
|
|
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
|
|
|
if (blkOffset > 0) {
|
|
@@ -1080,7 +1084,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized ReplicaInPipelineInterface append(Block b,
|
|
|
+ public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
|
|
|
long newGS, long expectedBlockLen) throws IOException {
|
|
|
// If the block was successfully finalized because all packets
|
|
|
// were successfully processed at the Datanode but the ack for
|
|
@@ -1093,7 +1097,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
throw new IOException("The new generation stamp " + newGS +
|
|
|
" should be greater than the replica " + b + "'s generation stamp");
|
|
|
}
|
|
|
- ReplicaInfo replicaInfo = volumeMap.get(b);
|
|
|
+ // TODO:FEDERATION use ExtendedBlock
|
|
|
+ ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
|
|
|
if (replicaInfo == null) {
|
|
|
throw new ReplicaNotFoundException(
|
|
|
ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
|
|
@@ -1225,11 +1230,13 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
return replicaInfo;
|
|
|
}
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
|
|
|
+ public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
|
|
|
long newGS, long expectedBlockLen) throws IOException {
|
|
|
DataNode.LOG.info("Recover failed append to " + b);
|
|
|
|
|
|
- ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
|
|
+ // TODO:FEDERATION use ExtendedBlock
|
|
|
+ ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS,
|
|
|
+ expectedBlockLen);
|
|
|
|
|
|
// change the replica's state/gs etc.
|
|
|
if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
|
|
@@ -1241,16 +1248,17 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public void recoverClose(Block b, long newGS,
|
|
|
+ public void recoverClose(ExtendedBlock b, long newGS,
|
|
|
long expectedBlockLen) throws IOException {
|
|
|
DataNode.LOG.info("Recover failed close " + b);
|
|
|
// check replica's state
|
|
|
- ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
|
|
+ ReplicaInfo replicaInfo = recoverCheck(b.getLocalBlock(), newGS,
|
|
|
+ expectedBlockLen);
|
|
|
// bump the replica's GS
|
|
|
bumpReplicaGS(replicaInfo, newGS);
|
|
|
// finalize the replica if RBW
|
|
|
if (replicaInfo.getState() == ReplicaState.RBW) {
|
|
|
- finalizeBlock(replicaInfo);
|
|
|
+ finalizeReplica(replicaInfo);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1282,7 +1290,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized ReplicaInPipelineInterface createRbw(Block b)
|
|
|
+ public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
|
|
|
throws IOException {
|
|
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
|
|
|
if (replicaInfo != null) {
|
|
@@ -1293,7 +1301,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
// create a new block
|
|
|
FSVolume v = volumes.getNextVolume(b.getNumBytes());
|
|
|
// create a rbw file to hold block in the designated volume
|
|
|
- File f = v.createRbwFile(b);
|
|
|
+ File f = v.createRbwFile(b.getLocalBlock());
|
|
|
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
|
|
b.getGenerationStamp(), v, f.getParentFile());
|
|
|
volumeMap.add(newReplicaInfo);
|
|
@@ -1301,7 +1309,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
|
|
|
+ public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
|
|
|
long newGS, long minBytesRcvd, long maxBytesRcvd)
|
|
|
throws IOException {
|
|
|
DataNode.LOG.info("Recover the RBW replica " + b);
|
|
@@ -1350,7 +1358,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized ReplicaInPipelineInterface createTemporary(Block b)
|
|
|
+ public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
|
|
|
throws IOException {
|
|
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
|
|
|
if (replicaInfo != null) {
|
|
@@ -1361,7 +1369,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
FSVolume v = volumes.getNextVolume(b.getNumBytes());
|
|
|
// create a temporary file to hold block in the designated volume
|
|
|
- File f = v.createTmpFile(b);
|
|
|
+ File f = v.createTmpFile(b.getLocalBlock());
|
|
|
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
|
|
b.getGenerationStamp(), v, f.getParentFile());
|
|
|
volumeMap.add(newReplicaInfo);
|
|
@@ -1374,7 +1382,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* last checksum will be overwritten.
|
|
|
*/
|
|
|
@Override // FSDatasetInterface
|
|
|
- public void adjustCrcChannelPosition(Block b, BlockWriteStreams streams,
|
|
|
+ public void adjustCrcChannelPosition(ExtendedBlock b, BlockWriteStreams streams,
|
|
|
int checksumSize) throws IOException {
|
|
|
FileOutputStream file = (FileOutputStream) streams.checksumOut;
|
|
|
FileChannel channel = file.getChannel();
|
|
@@ -1407,8 +1415,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* Complete the block write!
|
|
|
*/
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized void finalizeBlock(Block b) throws IOException {
|
|
|
- ReplicaInfo replicaInfo = getReplicaInfo(b);
|
|
|
+ public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
|
|
|
+ ReplicaInfo replicaInfo = getReplicaInfo(b.getLocalBlock());
|
|
|
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
|
|
// this is legal, when recovery happens on a file that has
|
|
|
// been opened for append but never modified
|
|
@@ -1444,15 +1452,15 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* Remove the temporary block file (if any)
|
|
|
*/
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized void unfinalizeBlock(Block b) throws IOException {
|
|
|
- ReplicaInfo replicaInfo = volumeMap.get(b);
|
|
|
+ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
|
|
+ ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
|
|
|
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
|
|
// remove from volumeMap
|
|
|
- volumeMap.remove(b);
|
|
|
+ volumeMap.remove(b.getLocalBlock());
|
|
|
|
|
|
// delete the on-disk temp file
|
|
|
if (delBlockFromDisk(replicaInfo.getBlockFile(),
|
|
|
- replicaInfo.getMetaFile(), b)) {
|
|
|
+ replicaInfo.getMetaFile(), b.getLocalBlock())) {
|
|
|
DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
|
|
|
}
|
|
|
}
|
|
@@ -1550,8 +1558,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* valid means finalized
|
|
|
*/
|
|
|
@Override // FSDatasetInterface
|
|
|
- public boolean isValidBlock(Block b) {
|
|
|
- ReplicaInfo replicaInfo = volumeMap.get(b);
|
|
|
+ public boolean isValidBlock(ExtendedBlock b) {
|
|
|
+ ReplicaInfo replicaInfo = volumeMap.get(b.getLocalBlock());
|
|
|
if (replicaInfo == null ||
|
|
|
replicaInfo.getState() != ReplicaState.FINALIZED) {
|
|
|
return false;
|
|
@@ -1609,7 +1617,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* just get rid of it.
|
|
|
*/
|
|
|
@Override // FSDatasetInterface
|
|
|
- public void invalidate(Block invalidBlks[]) throws IOException {
|
|
|
+ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
|
|
|
boolean error = false;
|
|
|
for (int i = 0; i < invalidBlks.length; i++) {
|
|
|
File f = null;
|
|
@@ -1850,7 +1858,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
// Remove the block from volumeMap
|
|
|
volumeMap.remove(blockId);
|
|
|
if (datanode.blockScanner != null) {
|
|
|
- datanode.blockScanner.deleteBlock(new Block(blockId));
|
|
|
+ // TODO:FEDERATION pass the right bpid
|
|
|
+ datanode.blockScanner.deleteBlock("TODO", new Block(blockId));
|
|
|
}
|
|
|
DataNode.LOG.warn("Removed block " + blockId
|
|
|
+ " from memory with missing block file on the disk");
|
|
@@ -1872,7 +1881,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
diskFile.length(), diskGS, vol, diskFile.getParentFile());
|
|
|
volumeMap.add(diskBlockInfo);
|
|
|
if (datanode.blockScanner != null) {
|
|
|
- datanode.blockScanner.addBlock(diskBlockInfo);
|
|
|
+ datanode.blockScanner.addBlock(new ExtendedBlock(diskBlockInfo));
|
|
|
}
|
|
|
DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
|
|
|
return;
|
|
@@ -2041,7 +2050,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInfo updateReplicaUnderRecovery(
|
|
|
- final Block oldBlock,
|
|
|
+ final ExtendedBlock oldBlock,
|
|
|
final long recoveryId,
|
|
|
final long newlength) throws IOException {
|
|
|
//get replica
|
|
@@ -2112,7 +2121,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
@Override // FSDatasetInterface
|
|
|
- public synchronized long getReplicaVisibleLength(final Block block)
|
|
|
+ public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
|
|
|
throws IOException {
|
|
|
final Replica replica = volumeMap.get(block.getBlockId());
|
|
|
if (replica == null) {
|