|
@@ -71,7 +71,6 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
|
@@ -928,8 +927,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
targetVolume, blockFiles[0].getParentFile(), 0);
|
|
|
newReplicaInfo.setNumBytes(blockFiles[1].length());
|
|
|
// Finalize the copied files
|
|
|
- newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo,
|
|
|
- false);
|
|
|
+ newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
|
|
|
|
|
|
removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
|
|
|
oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
|
|
@@ -1254,7 +1252,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
bumpReplicaGS(replicaInfo, newGS);
|
|
|
// finalize the replica if RBW
|
|
|
if (replicaInfo.getState() == ReplicaState.RBW) {
|
|
|
- finalizeReplica(b.getBlockPoolId(), replicaInfo, false);
|
|
|
+ finalizeReplica(b.getBlockPoolId(), replicaInfo);
|
|
|
}
|
|
|
return replicaInfo;
|
|
|
}
|
|
@@ -1583,23 +1581,38 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
* Complete the block write!
|
|
|
*/
|
|
|
@Override // FsDatasetSpi
|
|
|
- public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
|
|
|
+ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
|
|
|
throws IOException {
|
|
|
- if (Thread.interrupted()) {
|
|
|
- // Don't allow data modifications from interrupted threads
|
|
|
- throw new IOException("Cannot finalize block from Interrupted Thread");
|
|
|
+ ReplicaInfo replicaInfo = null;
|
|
|
+ ReplicaInfo finalizedReplicaInfo = null;
|
|
|
+ synchronized (this) {
|
|
|
+ if (Thread.interrupted()) {
|
|
|
+ // Don't allow data modifications from interrupted threads
|
|
|
+ throw new IOException("Cannot finalize block from Interrupted Thread");
|
|
|
+ }
|
|
|
+ replicaInfo = getReplicaInfo(b);
|
|
|
+ if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
|
|
+ // this is legal, when recovery happens on a file that has
|
|
|
+ // been opened for append but never modified
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo);
|
|
|
}
|
|
|
- ReplicaInfo replicaInfo = getReplicaInfo(b);
|
|
|
- if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
|
|
- // this is legal, when recovery happens on a file that has
|
|
|
- // been opened for append but never modified
|
|
|
- return;
|
|
|
+ /*
|
|
|
+ * Sync the directory after rename from tmp/rbw to Finalized if
|
|
|
+ * configured. Though rename should be atomic operation, sync on both
|
|
|
+ * dest and src directories are done because IOUtils.fsync() calls
|
|
|
+ * directory's channel sync, not the journal itself.
|
|
|
+ */
|
|
|
+ if (fsyncDir) {
|
|
|
+ File f = replicaInfo.getBlockFile();
|
|
|
+ File dest = finalizedReplicaInfo.getBlockFile();
|
|
|
+ fsyncDirectory(dest.getParentFile(), f.getParentFile());
|
|
|
}
|
|
|
- finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private synchronized FinalizedReplica finalizeReplica(String bpid,
|
|
|
- ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException {
|
|
|
+ ReplicaInfo replicaInfo) throws IOException {
|
|
|
FinalizedReplica newReplicaInfo = null;
|
|
|
if (replicaInfo.getState() == ReplicaState.RUR &&
|
|
|
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
|
|
@@ -1610,23 +1623,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
|
|
|
File f = replicaInfo.getBlockFile();
|
|
|
if (v == null) {
|
|
|
- throw new IOException("No volume for temporary file " + f +
|
|
|
+ throw new IOException("No volume for temporary file " + f +
|
|
|
" for block " + replicaInfo);
|
|
|
}
|
|
|
|
|
|
File dest = v.addFinalizedBlock(
|
|
|
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
|
|
|
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
|
|
-
|
|
|
- /*
|
|
|
- * Sync the directory after rename from tmp/rbw to Finalized if
|
|
|
- * configured. Though rename should be atomic operation, sync on both
|
|
|
- * dest and src directories are done because IOUtils.fsync() calls
|
|
|
- * directory's channel sync, not the journal itself.
|
|
|
- */
|
|
|
- if (fsyncDir) {
|
|
|
- fsyncDirectory(dest.getParentFile(), f.getParentFile());
|
|
|
- }
|
|
|
if (v.isTransientStorage()) {
|
|
|
ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
|
|
|
datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
|
|
@@ -2553,12 +2556,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
// but it is immediately converted to finalized state within the same
|
|
|
// lock, so no need to update it.
|
|
|
volumeMap.add(bpid, newReplicaInfo);
|
|
|
- finalizeReplica(bpid, newReplicaInfo, false);
|
|
|
+ finalizeReplica(bpid, newReplicaInfo);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// finalize the block
|
|
|
- return finalizeReplica(bpid, rur, false);
|
|
|
+ return finalizeReplica(bpid, rur);
|
|
|
}
|
|
|
|
|
|
private File[] copyReplicaWithNewBlockIdAndGS(
|