|
@@ -36,6 +36,7 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
|
|
@@ -105,8 +106,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
if ( ! metaData.renameTo( newmeta ) ||
|
|
|
! src.renameTo( dest ) ) {
|
|
|
throw new IOException( "could not move files for " + b +
|
|
|
- " from tmp to " +
|
|
|
- dest.getAbsolutePath() );
|
|
|
+ " from " + src + " to " +
|
|
|
+ dest.getAbsolutePath() + " or from"
|
|
|
+ + metaData + " to " + newmeta);
|
|
|
}
|
|
|
if (DataNode.LOG.isDebugEnabled()) {
|
|
|
DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
|
|
@@ -171,7 +173,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
return Block.GRANDFATHER_GENERATION_STAMP;
|
|
|
}
|
|
|
|
|
|
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
|
|
|
+ void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) {
|
|
|
if (children != null) {
|
|
|
for (int i = 0; i < children.length; i++) {
|
|
|
children[i].getVolumeMap(volumeMap, volume);
|
|
@@ -179,11 +181,18 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
File blockFiles[] = dir.listFiles();
|
|
|
- for (int i = 0; i < blockFiles.length; i++) {
|
|
|
- if (Block.isBlockFilename(blockFiles[i])) {
|
|
|
- long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
|
|
|
- volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp),
|
|
|
- new ReplicaInfo(volume, blockFiles[i]));
|
|
|
+ for (File blockFile : blockFiles) {
|
|
|
+ if (Block.isBlockFilename(blockFile)) {
|
|
|
+ long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
|
|
|
+ long blockId = Block.filename2id(blockFile.getName());
|
|
|
+ ReplicaInfo oldReplica = volumeMap.add(
|
|
|
+ new FinalizedReplica(blockId, blockFile.length(), genStamp,
|
|
|
+ volume, blockFile.getParentFile()));
|
|
|
+ if (oldReplica != null) {
|
|
|
+ DataNode.LOG.warn("Two block files have the same block id exits " +
|
|
|
+ "on disk: " + oldReplica.getBlockFile() +
|
|
|
+ " and " + blockFile );
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -403,7 +412,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
DiskChecker.checkDir(tmpDir);
|
|
|
}
|
|
|
|
|
|
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
|
|
|
+ void getVolumeMap(ReplicasMap volumeMap) {
|
|
|
dataDir.getVolumeMap(volumeMap, this);
|
|
|
}
|
|
|
|
|
@@ -496,7 +505,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
return remaining;
|
|
|
}
|
|
|
|
|
|
- synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
|
|
|
+ synchronized void getVolumeMap(ReplicasMap volumeMap) {
|
|
|
for (int idx = 0; idx < volumes.length; idx++) {
|
|
|
volumes[idx].getVolumeMap(volumeMap);
|
|
|
}
|
|
@@ -538,24 +547,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
public static final short METADATA_VERSION = 1;
|
|
|
|
|
|
|
|
|
- static class ActiveFile {
|
|
|
- final File file;
|
|
|
- final List<Thread> threads = new ArrayList<Thread>(2);
|
|
|
-
|
|
|
- ActiveFile(File f, List<Thread> list) {
|
|
|
- file = f;
|
|
|
- if (list != null) {
|
|
|
- threads.addAll(list);
|
|
|
- }
|
|
|
- threads.add(Thread.currentThread());
|
|
|
- }
|
|
|
-
|
|
|
- public String toString() {
|
|
|
- return getClass().getSimpleName() + "(file=" + file
|
|
|
- + ", threads=" + threads + ")";
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
static String getMetaFileName(String blockFileName, long genStamp) {
|
|
|
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
|
|
|
}
|
|
@@ -605,22 +596,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
/** Return the block file for the given ID */
|
|
|
public File findBlockFile(long blockId) {
|
|
|
- final Block b = new Block(blockId);
|
|
|
- File blockfile = null;
|
|
|
- ActiveFile activefile = ongoingCreates.get(b);
|
|
|
- if (activefile != null) {
|
|
|
- blockfile = activefile.file;
|
|
|
- }
|
|
|
- if (blockfile == null) {
|
|
|
- blockfile = getFile(b);
|
|
|
- }
|
|
|
- if (blockfile == null) {
|
|
|
- if (DataNode.LOG.isDebugEnabled()) {
|
|
|
- DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
|
|
|
- DataNode.LOG.debug("volumeMap=" + volumeMap);
|
|
|
- }
|
|
|
- }
|
|
|
- return blockfile;
|
|
|
+ return getFile(blockId);
|
|
|
}
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
@@ -651,9 +627,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
FSVolumeSet volumes;
|
|
|
- private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
|
|
|
private int maxBlocksPerDir = 0;
|
|
|
- HashMap<Block,ReplicaInfo> volumeMap = null;
|
|
|
+ ReplicasMap volumeMap = new ReplicasMap();
|
|
|
static Random random = new Random();
|
|
|
|
|
|
// Used for synchronizing access to usage stats
|
|
@@ -669,7 +644,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
|
|
|
}
|
|
|
volumes = new FSVolumeSet(volArray);
|
|
|
- volumeMap = new HashMap<Block, ReplicaInfo>();
|
|
|
volumes.getVolumeMap(volumeMap);
|
|
|
registerMBean(storage.getStorageID());
|
|
|
}
|
|
@@ -737,15 +711,27 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns handles to the block file and its metadata file
|
|
|
+ * Get the meta info of a block stored in volumeMap
|
|
|
+ * @param b block
|
|
|
+ * @return the meta replica information
|
|
|
+ * @throws IOException if no entry is in the map or
|
|
|
+ * there is a generation stamp mismatch
|
|
|
*/
|
|
|
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
|
|
|
- long blkOffset, long ckoff) throws IOException {
|
|
|
-
|
|
|
+ private ReplicaInfo getReplicaInfo(Block b) throws IOException {
|
|
|
ReplicaInfo info = volumeMap.get(b);
|
|
|
if (info == null) {
|
|
|
throw new IOException("Block " + b + " does not exist in volumeMap.");
|
|
|
}
|
|
|
+ return info;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns handles to the block file and its metadata file
|
|
|
+ */
|
|
|
+ public synchronized BlockInputStreams getTmpInputStreams(Block b,
|
|
|
+ long blkOffset, long ckoff) throws IOException {
|
|
|
+
|
|
|
+ ReplicaInfo info = getReplicaInfo(b);
|
|
|
FSVolume v = info.getVolume();
|
|
|
File blockFile = v.getTmpFile(b);
|
|
|
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
|
|
@@ -774,23 +760,16 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* @param block Block
|
|
|
* @param numLinks Detach if the number of links exceed this value
|
|
|
* @throws IOException
|
|
|
- * @return - true if the specified block was detached
|
|
|
+ * @return - true if the specified block was detached or the block
|
|
|
+ * is not in any snapshot.
|
|
|
*/
|
|
|
public boolean detachBlock(Block block, int numLinks) throws IOException {
|
|
|
ReplicaInfo info = null;
|
|
|
|
|
|
synchronized (this) {
|
|
|
- info = volumeMap.get(block);
|
|
|
- }
|
|
|
- return info.detachBlock(block, numLinks);
|
|
|
- }
|
|
|
-
|
|
|
- static private <T> void updateBlockMap(Map<Block, T> blockmap,
|
|
|
- Block oldblock, Block newblock) throws IOException {
|
|
|
- if (blockmap.containsKey(oldblock)) {
|
|
|
- T value = blockmap.remove(oldblock);
|
|
|
- blockmap.put(newblock, value);
|
|
|
+ info = getReplicaInfo(block);
|
|
|
}
|
|
|
+ return info.detachBlock(numLinks);
|
|
|
}
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
@@ -822,18 +801,24 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
/**
|
|
|
* Try to update an old block to a new block.
|
|
|
- * If there are ongoing create threads running for the old block,
|
|
|
+ * If there are write threads running for the old block,
|
|
|
* the threads will be returned without updating the block.
|
|
|
*
|
|
|
- * @return ongoing create threads if there is any. Otherwise, return null.
|
|
|
+ * @return write threads if there is any. Otherwise, return null.
|
|
|
*/
|
|
|
private synchronized List<Thread> tryUpdateBlock(
|
|
|
Block oldblock, Block newblock) throws IOException {
|
|
|
- //check ongoing create threads
|
|
|
- final ActiveFile activefile = ongoingCreates.get(oldblock);
|
|
|
- if (activefile != null && !activefile.threads.isEmpty()) {
|
|
|
+ //check write threads
|
|
|
+ final ReplicaInfo replicaInfo = volumeMap.get(oldblock.getBlockId());
|
|
|
+ File blockFile = replicaInfo==null?null:replicaInfo.getBlockFile();
|
|
|
+ if (blockFile == null) {
|
|
|
+ throw new IOException("Block " + oldblock + " does not exist.");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (replicaInfo instanceof ReplicaInPipeline) {
|
|
|
+ List<Thread> threads = ((ReplicaInPipeline)replicaInfo).getThreads();
|
|
|
//remove dead threads
|
|
|
- for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
|
|
|
+ for(Iterator<Thread> i = threads.iterator(); i.hasNext(); ) {
|
|
|
final Thread t = i.next();
|
|
|
if (!t.isAlive()) {
|
|
|
i.remove();
|
|
@@ -841,19 +826,14 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
//return living threads
|
|
|
- if (!activefile.threads.isEmpty()) {
|
|
|
- return new ArrayList<Thread>(activefile.threads);
|
|
|
+ if (!threads.isEmpty()) {
|
|
|
+ return new ArrayList<Thread>(threads);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//No ongoing create threads is alive. Update block.
|
|
|
- File blockFile = findBlockFile(oldblock.getBlockId());
|
|
|
- if (blockFile == null) {
|
|
|
- throw new IOException("Block " + oldblock + " does not exist.");
|
|
|
- }
|
|
|
-
|
|
|
- File oldMetaFile = findMetaFile(blockFile);
|
|
|
- long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
|
|
|
+ File oldMetaFile = replicaInfo.getMetaFile();
|
|
|
+ long oldgs = replicaInfo.getGenerationStamp();
|
|
|
|
|
|
//rename meta file to a tmp file
|
|
|
File tmpMetaFile = new File(oldMetaFile.getParent(),
|
|
@@ -863,7 +843,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
//update generation stamp
|
|
|
- if (oldgs > newblock.getGenerationStamp()) {
|
|
|
+ if (oldgs >= newblock.getGenerationStamp()) {
|
|
|
throw new IOException("Cannot update block (id=" + newblock.getBlockId()
|
|
|
+ ") generation stamp from " + oldgs
|
|
|
+ " to " + newblock.getGenerationStamp());
|
|
@@ -878,15 +858,16 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
|
|
|
}
|
|
|
|
|
|
+ // update replicaInfo
|
|
|
+ replicaInfo.setGenerationStamp(newblock.getGenerationStamp());
|
|
|
+ replicaInfo.setNumBytes(newblock.getNumBytes());
|
|
|
+
|
|
|
//rename the tmp file to the new meta file (with new generation stamp)
|
|
|
- File newMetaFile = getMetaFile(blockFile, newblock);
|
|
|
+ File newMetaFile = replicaInfo.getMetaFile();
|
|
|
if (!tmpMetaFile.renameTo(newMetaFile)) {
|
|
|
throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
|
|
|
}
|
|
|
|
|
|
- updateBlockMap(ongoingCreates, oldblock, newblock);
|
|
|
- updateBlockMap(volumeMap, oldblock, newblock);
|
|
|
-
|
|
|
// paranoia! verify that the contents of the stored block
|
|
|
// matches the block file on disk.
|
|
|
validateBlockMetadata(newblock);
|
|
@@ -964,6 +945,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
//
|
|
|
// Make sure the block isn't a valid one - we're still creating it!
|
|
|
//
|
|
|
+ ReplicaInfo replicaInfo = volumeMap.get(b);
|
|
|
if (isValidBlock(b)) {
|
|
|
if (!isRecovery) {
|
|
|
throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
|
|
@@ -973,7 +955,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
// some of the packets were not received by the client. The client
|
|
|
// re-opens the connection and retries sending those packets.
|
|
|
// The other reason is that an "append" is occurring to this block.
|
|
|
- detachBlock(b, 1);
|
|
|
+ if (replicaInfo != null) {
|
|
|
+ replicaInfo.detachBlock(1);
|
|
|
+ }
|
|
|
}
|
|
|
long blockSize = b.getNumBytes();
|
|
|
|
|
@@ -986,10 +970,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
//
|
|
|
// Is it already in the create process?
|
|
|
//
|
|
|
- ActiveFile activeFile = ongoingCreates.get(b);
|
|
|
- if (activeFile != null) {
|
|
|
- f = activeFile.file;
|
|
|
- threads = activeFile.threads;
|
|
|
+ if (replicaInfo != null && replicaInfo instanceof ReplicaInPipeline) {
|
|
|
+ f = replicaInfo.getBlockFile();
|
|
|
+ threads = ((ReplicaInPipeline)replicaInfo).getThreads();
|
|
|
|
|
|
if (!isRecovery) {
|
|
|
throw new BlockAlreadyExistsException("Block " + b +
|
|
@@ -999,27 +982,27 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
thread.interrupt();
|
|
|
}
|
|
|
}
|
|
|
- ongoingCreates.remove(b);
|
|
|
}
|
|
|
FSVolume v = null;
|
|
|
- if (!isRecovery) {
|
|
|
+ if (!isRecovery) { // create a new block
|
|
|
v = volumes.getNextVolume(blockSize);
|
|
|
// create temporary file to hold block in the designated volume
|
|
|
f = createTmpFile(v, b);
|
|
|
- volumeMap.put(b, new ReplicaInfo(v));
|
|
|
+ replicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
|
|
+ b.getGenerationStamp(), v, f.getParentFile());
|
|
|
+ volumeMap.add(replicaInfo);
|
|
|
} else if (f != null) {
|
|
|
DataNode.LOG.info("Reopen already-open Block for append " + b);
|
|
|
- // create or reuse temporary file to hold block in the designated volume
|
|
|
- v = volumeMap.get(b).getVolume();
|
|
|
- volumeMap.put(b, new ReplicaInfo(v));
|
|
|
} else {
|
|
|
// reopening block for appending to it.
|
|
|
DataNode.LOG.info("Reopen Block for append " + b);
|
|
|
- v = volumeMap.get(b).getVolume();
|
|
|
+ v = replicaInfo.getVolume();
|
|
|
f = createTmpFile(v, b);
|
|
|
- File blkfile = getBlockFile(b);
|
|
|
- File oldmeta = getMetaFile(b);
|
|
|
- File newmeta = getMetaFile(f, b);
|
|
|
+ File blkfile = replicaInfo.getBlockFile();
|
|
|
+ File oldmeta = replicaInfo.getMetaFile();
|
|
|
+ replicaInfo = new ReplicaInPipeline(replicaInfo,
|
|
|
+ v, f.getParentFile(), threads);
|
|
|
+ File newmeta = replicaInfo.getMetaFile();
|
|
|
|
|
|
// rename meta file to tmp directory
|
|
|
DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
|
|
@@ -1042,7 +1025,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
" to tmp dir " + f);
|
|
|
}
|
|
|
}
|
|
|
- volumeMap.put(b, new ReplicaInfo(v));
|
|
|
+ volumeMap.add(replicaInfo);
|
|
|
}
|
|
|
if (f == null) {
|
|
|
DataNode.LOG.warn("Block " + b + " reopen failed " +
|
|
@@ -1050,7 +1033,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
throw new IOException("Block " + b + " reopen failed " +
|
|
|
" Unable to locate tmp file.");
|
|
|
}
|
|
|
- ongoingCreates.put(b, new ActiveFile(f, threads));
|
|
|
}
|
|
|
|
|
|
try {
|
|
@@ -1093,7 +1075,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
throws IOException {
|
|
|
long size = 0;
|
|
|
synchronized (this) {
|
|
|
- FSVolume vol = volumeMap.get(b).getVolume();
|
|
|
+ FSVolume vol = getReplicaInfo(b).getVolume();
|
|
|
size = vol.getTmpFile(b).length();
|
|
|
}
|
|
|
if (size < dataOffset) {
|
|
@@ -1111,7 +1093,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
|
|
|
if ( vol == null ) {
|
|
|
- vol = volumeMap.get( blk ).getVolume();
|
|
|
+ vol = getReplicaInfo( blk ).getVolume();
|
|
|
if ( vol == null ) {
|
|
|
throw new IOException("Could not find volume for block " + blk);
|
|
|
}
|
|
@@ -1131,40 +1113,43 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* Complete the block write!
|
|
|
*/
|
|
|
public synchronized void finalizeBlock(Block b) throws IOException {
|
|
|
- ActiveFile activeFile = ongoingCreates.get(b);
|
|
|
- if (activeFile == null) {
|
|
|
+ ReplicaInfo replicaInfo = getReplicaInfo(b);
|
|
|
+ if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
|
|
throw new IOException("Block " + b + " is already finalized.");
|
|
|
}
|
|
|
- File f = activeFile.file;
|
|
|
- if (f == null || !f.exists()) {
|
|
|
- throw new IOException("No temporary file " + f + " for block " + b);
|
|
|
- }
|
|
|
- FSVolume v = volumeMap.get(b).getVolume();
|
|
|
- if (v == null) {
|
|
|
- throw new IOException("No volume for temporary file " + f +
|
|
|
- " for block " + b);
|
|
|
+ ReplicaInfo newReplicaInfo = null;
|
|
|
+ if (replicaInfo.getState() == ReplicaState.RUR &&
|
|
|
+ ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
|
|
|
+ ReplicaState.FINALIZED) {
|
|
|
+ newReplicaInfo = ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
|
|
|
+ } else {
|
|
|
+ FSVolume v = replicaInfo.getVolume();
|
|
|
+ File f = replicaInfo.getBlockFile();
|
|
|
+ if (v == null) {
|
|
|
+ throw new IOException("No volume for temporary file " + f +
|
|
|
+ " for block " + b);
|
|
|
+ }
|
|
|
+
|
|
|
+ File dest = v.addBlock(b, f);
|
|
|
+ newReplicaInfo = new FinalizedReplica(b, v, dest.getParentFile());
|
|
|
}
|
|
|
-
|
|
|
- File dest = null;
|
|
|
- dest = v.addBlock(b, f);
|
|
|
- volumeMap.put(b, new ReplicaInfo(v, dest));
|
|
|
- ongoingCreates.remove(b);
|
|
|
+ volumeMap.add(newReplicaInfo);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Remove the temporary block file (if any)
|
|
|
*/
|
|
|
public synchronized void unfinalizeBlock(Block b) throws IOException {
|
|
|
- // remove the block from in-memory data structure
|
|
|
- ActiveFile activefile = ongoingCreates.remove(b);
|
|
|
- if (activefile == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- volumeMap.remove(b);
|
|
|
-
|
|
|
- // delete the on-disk temp file
|
|
|
- if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
|
|
|
- DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
|
|
|
+ ReplicaInfo replicaInfo = volumeMap.get(b);
|
|
|
+ if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
|
|
+ // remove from volumeMap
|
|
|
+ volumeMap.remove(b);
|
|
|
+
|
|
|
+ // delete the on-disk temp file
|
|
|
+ if (delBlockFromDisk(replicaInfo.getBlockFile(),
|
|
|
+ replicaInfo.getMetaFile(), b)) {
|
|
|
+ DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1200,8 +1185,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
public Block[] getBlockReport() {
|
|
|
ArrayList<Block> list = new ArrayList<Block>(volumeMap.size());
|
|
|
synchronized(this) {
|
|
|
- for (Block b : volumeMap.keySet()) {
|
|
|
- if (!ongoingCreates.containsKey(b)) {
|
|
|
+ for (ReplicaInfo b : volumeMap.replicas()) {
|
|
|
+ if (b.getState() == ReplicaState.FINALIZED) {
|
|
|
list.add(new Block(b));
|
|
|
}
|
|
|
}
|
|
@@ -1216,7 +1201,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
* is needed to handle concurrent modification to the block.
|
|
|
*/
|
|
|
synchronized Block[] getBlockList(boolean deepcopy) {
|
|
|
- Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
|
|
|
+ Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
|
|
|
if (deepcopy) {
|
|
|
for (int i = 0; i < list.length; i++) {
|
|
|
list[i] = new Block(list[i]);
|
|
@@ -1227,9 +1212,15 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
/**
|
|
|
* Check whether the given block is a valid one.
|
|
|
+ * valid means finalized
|
|
|
*/
|
|
|
public boolean isValidBlock(Block b) {
|
|
|
- return validateBlockFile(b) != null;
|
|
|
+ ReplicaInfo replicaInfo = volumeMap.get(b);
|
|
|
+ if (replicaInfo == null ||
|
|
|
+ replicaInfo.getState() != ReplicaState.FINALIZED) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return replicaInfo.getBlockFile().exists();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1248,10 +1239,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
|
public void validateBlockMetadata(Block b) throws IOException {
|
|
|
- ReplicaInfo info = volumeMap.get(b);
|
|
|
- if (info == null) {
|
|
|
- throw new IOException("Block " + b + " does not exist in volumeMap.");
|
|
|
- }
|
|
|
+ ReplicaInfo info = getReplicaInfo(b);
|
|
|
FSVolume v = info.getVolume();
|
|
|
File tmp = v.getTmpFile(b);
|
|
|
File f = getFile(b);
|
|
@@ -1307,7 +1295,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
synchronized (this) {
|
|
|
f = getFile(invalidBlks[i]);
|
|
|
ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
|
|
|
- if (dinfo == null) {
|
|
|
+ if (dinfo == null ||
|
|
|
+ dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
|
|
|
DataNode.LOG.warn("Unexpected error trying to delete block "
|
|
|
+ invalidBlks[i] +
|
|
|
". BlockInfo not found in volumeMap.");
|
|
@@ -1366,16 +1355,24 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Turn the block identifier into a filename.
|
|
|
+ * Turn the block identifier into a filename; ignore generation stamp!!!
|
|
|
*/
|
|
|
public synchronized File getFile(Block b) {
|
|
|
- ReplicaInfo info = volumeMap.get(b);
|
|
|
+ return getFile(b.getBlockId());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Turn the block identifier into a filename
|
|
|
+ * @param blockId a block's id
|
|
|
+ * @return on disk data file path; null if the replica does not exist
|
|
|
+ */
|
|
|
+ private File getFile(long blockId) {
|
|
|
+ ReplicaInfo info = volumeMap.get(blockId);
|
|
|
if (info != null) {
|
|
|
- return info.getFile();
|
|
|
+ return info.getBlockFile();
|
|
|
}
|
|
|
- return null;
|
|
|
+ return null;
|
|
|
}
|
|
|
-
|
|
|
/**
|
|
|
* check if a data directory is healthy
|
|
|
* @throws DiskErrorException
|
|
@@ -1459,11 +1456,12 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
*/
|
|
|
public void checkAndUpdate(long blockId, File diskFile,
|
|
|
File diskMetaFile, FSVolume vol) {
|
|
|
- Block block = new Block(blockId);
|
|
|
DataNode datanode = DataNode.getDataNode();
|
|
|
Block corruptBlock = null;
|
|
|
+ ReplicaInfo memBlockInfo;
|
|
|
synchronized (this) {
|
|
|
- if (ongoingCreates.get(block) != null) {
|
|
|
+ memBlockInfo = volumeMap.get(blockId);
|
|
|
+ if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
|
|
|
// Block is not finalized - ignore the difference
|
|
|
return;
|
|
|
}
|
|
@@ -1472,7 +1470,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
Block.getGenerationStamp(diskMetaFile.getName()) :
|
|
|
Block.GRANDFATHER_GENERATION_STAMP;
|
|
|
|
|
|
- ReplicaInfo memBlockInfo = volumeMap.get(block);
|
|
|
if (diskFile == null || !diskFile.exists()) {
|
|
|
if (memBlockInfo == null) {
|
|
|
// Block file does not exist and block does not exist in memory
|
|
@@ -1484,14 +1481,14 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- if (!memBlockInfo.getFile().exists()) {
|
|
|
+ if (!memBlockInfo.getBlockFile().exists()) {
|
|
|
// Block is in memory and not on the disk
|
|
|
// Remove the block from volumeMap
|
|
|
- volumeMap.remove(block);
|
|
|
+ volumeMap.remove(blockId);
|
|
|
if (datanode.blockScanner != null) {
|
|
|
- datanode.blockScanner.deleteBlock(block);
|
|
|
+ datanode.blockScanner.deleteBlock(new Block(blockId));
|
|
|
}
|
|
|
- DataNode.LOG.warn("Removed block " + block.getBlockId()
|
|
|
+ DataNode.LOG.warn("Removed block " + blockId
|
|
|
+ " from memory with missing block file on the disk");
|
|
|
// Finally remove the metadata file
|
|
|
if (diskMetaFile != null && diskMetaFile.exists()
|
|
@@ -1507,23 +1504,20 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
*/
|
|
|
if (memBlockInfo == null) {
|
|
|
// Block is missing in memory - add the block to volumeMap
|
|
|
- ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
|
|
|
- Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
|
|
|
- volumeMap.put(diskBlock, diskBlockInfo);
|
|
|
+ ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
|
|
|
+ diskFile.length(), diskGS, vol, diskFile.getParentFile());
|
|
|
+ volumeMap.add(diskBlockInfo);
|
|
|
if (datanode.blockScanner != null) {
|
|
|
- datanode.blockScanner.addBlock(diskBlock);
|
|
|
+ datanode.blockScanner.addBlock(diskBlockInfo);
|
|
|
}
|
|
|
- DataNode.LOG.warn("Added missing block to memory " + diskBlock);
|
|
|
+ DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
|
|
|
return;
|
|
|
}
|
|
|
/*
|
|
|
* Block exists in volumeMap and the block file exists on the disk
|
|
|
*/
|
|
|
- // Iterate to get key from volumeMap for the blockId
|
|
|
- Block memBlock = getBlockKey(blockId);
|
|
|
-
|
|
|
// Compare block files
|
|
|
- File memFile = memBlockInfo.getFile();
|
|
|
+ File memFile = memBlockInfo.getBlockFile();
|
|
|
if (memFile.exists()) {
|
|
|
if (memFile.compareTo(diskFile) != 0) {
|
|
|
DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
|
|
@@ -1540,19 +1534,17 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
+ memFile.getAbsolutePath()
|
|
|
+ " does not exist. Updating it to the file found during scan "
|
|
|
+ diskFile.getAbsolutePath());
|
|
|
- ReplicaInfo info = volumeMap.remove(memBlock);
|
|
|
- info.setFile(diskFile);
|
|
|
+ memBlockInfo.setDir(diskFile.getParentFile());
|
|
|
memFile = diskFile;
|
|
|
|
|
|
DataNode.LOG.warn("Updating generation stamp for block " + blockId
|
|
|
- + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
|
|
|
- memBlock.setGenerationStamp(diskGS);
|
|
|
- volumeMap.put(memBlock, info);
|
|
|
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
|
|
|
+ memBlockInfo.setGenerationStamp(diskGS);
|
|
|
}
|
|
|
|
|
|
// Compare generation stamp
|
|
|
- if (memBlock.getGenerationStamp() != diskGS) {
|
|
|
- File memMetaFile = getMetaFile(diskFile, memBlock);
|
|
|
+ if (memBlockInfo.getGenerationStamp() != diskGS) {
|
|
|
+ File memMetaFile = getMetaFile(diskFile, memBlockInfo);
|
|
|
if (memMetaFile.exists()) {
|
|
|
if (memMetaFile.compareTo(diskMetaFile) != 0) {
|
|
|
DataNode.LOG.warn("Metadata file in memory "
|
|
@@ -1569,23 +1561,19 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
: Block.GRANDFATHER_GENERATION_STAMP;
|
|
|
|
|
|
DataNode.LOG.warn("Updating generation stamp for block " + blockId
|
|
|
- + " from " + memBlock.getGenerationStamp() + " to " + gs);
|
|
|
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
|
|
|
|
|
|
- ReplicaInfo info = volumeMap.remove(memBlock);
|
|
|
- memBlock.setGenerationStamp(gs);
|
|
|
- volumeMap.put(memBlock, info);
|
|
|
+ memBlockInfo.setGenerationStamp(gs);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Compare block size
|
|
|
- if (memBlock.getNumBytes() != memFile.length()) {
|
|
|
+ if (memBlockInfo.getNumBytes() != memFile.length()) {
|
|
|
// Update the length based on the block file
|
|
|
- corruptBlock = new Block(memBlock);
|
|
|
+ corruptBlock = new Block(memBlockInfo);
|
|
|
DataNode.LOG.warn("Updating size of block " + blockId + " from "
|
|
|
- + memBlock.getNumBytes() + " to " + memFile.length());
|
|
|
- ReplicaInfo info = volumeMap.remove(memBlock);
|
|
|
- memBlock.setNumBytes(memFile.length());
|
|
|
- volumeMap.put(memBlock, info);
|
|
|
+ + memBlockInfo.getNumBytes() + " to " + memFile.length());
|
|
|
+ memBlockInfo.setNumBytes(memFile.length());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1605,18 +1593,14 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get reference to the key in the volumeMap. To be called from methods that
|
|
|
+ * Get reference to the replica meta info in the replicasMap.
|
|
|
+ * To be called from methods that
|
|
|
* are synchronized on {@link FSDataset}
|
|
|
* @param blockId
|
|
|
- * @return key from the volumeMap
|
|
|
+ * @return replica's meta information from the replicas map
|
|
|
*/
|
|
|
- Block getBlockKey(long blockId) {
|
|
|
+ ReplicaInfo getBlock(long blockId) {
|
|
|
assert(Thread.holdsLock(this));
|
|
|
- for (Block b : volumeMap.keySet()) {
|
|
|
- if (b.getBlockId() == blockId) {
|
|
|
- return b;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
+ return volumeMap.get(blockId);
|
|
|
}
|
|
|
}
|