|
@@ -21,6 +21,7 @@ import java.io.BufferedInputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.FileInputStream;
|
|
import java.io.FileInputStream;
|
|
|
|
+import java.io.FileNotFoundException;
|
|
import java.io.FileOutputStream;
|
|
import java.io.FileOutputStream;
|
|
import java.io.FilenameFilter;
|
|
import java.io.FilenameFilter;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
@@ -382,14 +383,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
return createTmpFile(b, f);
|
|
return createTmpFile(b, f);
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Returns the name of the temporary file for this block.
|
|
|
|
- */
|
|
|
|
- File getTmpFile(Block b) throws IOException {
|
|
|
|
- File f = new File(tmpDir, b.getBlockName());
|
|
|
|
- return f;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Files used for copy-on-write. They need recovery when datanode
|
|
* Files used for copy-on-write. They need recovery when datanode
|
|
* restarts.
|
|
* restarts.
|
|
@@ -1036,6 +1029,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
static private void truncateBlock(File blockFile, File metaFile,
|
|
static private void truncateBlock(File blockFile, File metaFile,
|
|
long oldlen, long newlen) throws IOException {
|
|
long oldlen, long newlen) throws IOException {
|
|
|
|
+ DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
|
|
|
|
+ + ", metaFile=" + metaFile
|
|
|
|
+ + ", oldlen=" + oldlen
|
|
|
|
+ + ", newlen=" + newlen);
|
|
|
|
+
|
|
if (newlen == oldlen) {
|
|
if (newlen == oldlen) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
@@ -1419,11 +1417,17 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
// been opened for append but never modified
|
|
// been opened for append but never modified
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- ReplicaInfo newReplicaInfo = null;
|
|
|
|
|
|
+ finalizeReplica(replicaInfo);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
|
|
|
|
+ throws IOException {
|
|
|
|
+ FinalizedReplica newReplicaInfo = null;
|
|
if (replicaInfo.getState() == ReplicaState.RUR &&
|
|
if (replicaInfo.getState() == ReplicaState.RUR &&
|
|
((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
|
|
((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
|
|
ReplicaState.FINALIZED) {
|
|
ReplicaState.FINALIZED) {
|
|
- newReplicaInfo = ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
|
|
|
|
|
|
+ newReplicaInfo = (FinalizedReplica)
|
|
|
|
+ ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
|
|
} else {
|
|
} else {
|
|
FSVolume v = replicaInfo.getVolume();
|
|
FSVolume v = replicaInfo.getVolume();
|
|
File f = replicaInfo.getBlockFile();
|
|
File f = replicaInfo.getBlockFile();
|
|
@@ -1436,6 +1440,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
|
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
|
}
|
|
}
|
|
volumeMap.add(newReplicaInfo);
|
|
volumeMap.add(newReplicaInfo);
|
|
|
|
+ return newReplicaInfo;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1578,46 +1583,25 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
/** {@inheritDoc} */
|
|
public void validateBlockMetadata(Block b) throws IOException {
|
|
public void validateBlockMetadata(Block b) throws IOException {
|
|
- ReplicaInfo info = getReplicaInfo(b);
|
|
|
|
- FSVolume v = info.getVolume();
|
|
|
|
- File tmp = v.getTmpFile(b);
|
|
|
|
- File f = getFile(b);
|
|
|
|
- if (f == null) {
|
|
|
|
- f = tmp;
|
|
|
|
- }
|
|
|
|
- if (f == null) {
|
|
|
|
- throw new IOException("Block " + b + " does not exist on disk.");
|
|
|
|
- }
|
|
|
|
|
|
+ checkReplicaFiles(getReplicaInfo(b));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Check the files of a replica. */
|
|
|
|
+ static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
|
|
|
|
+ final File f = r.getBlockFile();
|
|
if (!f.exists()) {
|
|
if (!f.exists()) {
|
|
- throw new IOException("Block " + b +
|
|
|
|
- " block file " + f +
|
|
|
|
- " does not exist on disk.");
|
|
|
|
- }
|
|
|
|
- if (b.getNumBytes() != f.length()) {
|
|
|
|
- throw new IOException("Block " + b +
|
|
|
|
- " length is " + b.getNumBytes() +
|
|
|
|
- " does not match block file length " +
|
|
|
|
- f.length());
|
|
|
|
- }
|
|
|
|
- File meta = getMetaFile(f, b);
|
|
|
|
- if (meta == null) {
|
|
|
|
- throw new IOException("Block " + b +
|
|
|
|
- " metafile does not exist.");
|
|
|
|
|
|
+ throw new FileNotFoundException("File " + f + " not found, r=" + r);
|
|
}
|
|
}
|
|
- if (!meta.exists()) {
|
|
|
|
- throw new IOException("Block " + b +
|
|
|
|
- " metafile " + meta +
|
|
|
|
- " does not exist on disk.");
|
|
|
|
|
|
+ if (r.getNumBytes() != f.length()) {
|
|
|
|
+ throw new IOException("File length mismatched."
|
|
|
|
+ + f + " length is " + f.length() + " but r=" + r);
|
|
}
|
|
}
|
|
- if (meta.length() == 0) {
|
|
|
|
- throw new IOException("Block " + b + " metafile " + meta + " is empty.");
|
|
|
|
|
|
+ final File metafile = getMetaFile(f, r);
|
|
|
|
+ if (!metafile.exists()) {
|
|
|
|
+ throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
|
|
}
|
|
}
|
|
- long stamp = parseGenerationStamp(f, meta);
|
|
|
|
- if (stamp != b.getGenerationStamp()) {
|
|
|
|
- throw new IOException("Block " + b +
|
|
|
|
- " genstamp is " + b.getGenerationStamp() +
|
|
|
|
- " does not match meta file stamp " +
|
|
|
|
- stamp);
|
|
|
|
|
|
+ if (metafile.length() == 0) {
|
|
|
|
+ throw new IOException("Metafile " + metafile + " is empty, r=" + r);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2045,4 +2029,70 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
|
}
|
|
}
|
|
return rur.createInfo();
|
|
return rur.createInfo();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /** Update a replica of a block. */
|
|
|
|
+ synchronized void updateReplica(final Block block, final long recoveryId,
|
|
|
|
+ final long newlength) throws IOException {
|
|
|
|
+ //get replica
|
|
|
|
+ final ReplicaInfo replica = volumeMap.get(block.getBlockId());
|
|
|
|
+ DataNode.LOG.info("updateReplica: block=" + block
|
|
|
|
+ + ", recoveryId=" + recoveryId
|
|
|
|
+ + ", length=" + newlength
|
|
|
|
+ + ", replica=" + replica);
|
|
|
|
+
|
|
|
|
+ //check replica
|
|
|
|
+ if (replica == null) {
|
|
|
|
+ throw new ReplicaNotFoundException(block);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //check replica state
|
|
|
|
+ if (replica.getState() != ReplicaState.RUR) {
|
|
|
|
+ throw new IOException("replica.getState() != " + ReplicaState.RUR
|
|
|
|
+ + ", replica=" + replica);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //check replica files before update
|
|
|
|
+ checkReplicaFiles(replica);
|
|
|
|
+
|
|
|
|
+ //update replica
|
|
|
|
+ final FinalizedReplica finalized = updateReplicaUnderRecovery(
|
|
|
|
+ (ReplicaUnderRecovery)replica, recoveryId, newlength);
|
|
|
|
+
|
|
|
|
+ //check replica files after update
|
|
|
|
+ checkReplicaFiles(finalized);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /** Update a ReplicaUnderRecovery to a FinalizedReplica. */
|
|
|
|
+ FinalizedReplica updateReplicaUnderRecovery(
|
|
|
|
+ final ReplicaUnderRecovery rur, final long recoveryId,
|
|
|
|
+ final long newlength) throws IOException {
|
|
|
|
+ DataNode.LOG.info("updateReplicaUnderRecovery: recoveryId=" + recoveryId
|
|
|
|
+ + ", newlength=" + newlength
|
|
|
|
+ + ", rur=" + rur);
|
|
|
|
+
|
|
|
|
+ //check recovery id
|
|
|
|
+ if (rur.getRecoveryID() != recoveryId) {
|
|
|
|
+ throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
|
|
|
|
+ + ", rur=" + rur);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // bump rur's GS to be recovery id
|
|
|
|
+ bumpReplicaGS(rur, recoveryId);
|
|
|
|
+
|
|
|
|
+ //update length
|
|
|
|
+ final File replicafile = rur.getBlockFile();
|
|
|
|
+ if (rur.getNumBytes() < newlength) {
|
|
|
|
+ throw new IOException("rur.getNumBytes() < newlength = " + newlength
|
|
|
|
+ + ", rur=" + rur);
|
|
|
|
+ }
|
|
|
|
+ if (rur.getNumBytes() > newlength) {
|
|
|
|
+ rur.detachBlock(1);
|
|
|
|
+ truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
|
|
|
|
+ // update RUR with the new length
|
|
|
|
+ rur.setNumBytes(newlength);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // finalize the block
|
|
|
|
+ return finalizeReplica(rur);
|
|
|
|
+ }
|
|
}
|
|
}
|