|
@@ -1466,38 +1466,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
|
|
|
@Override // FsDatasetSpi
|
|
|
- public ReplicaHandler createTemporary(
|
|
|
- StorageType storageType, ExtendedBlock b) throws IOException {
|
|
|
+ public ReplicaHandler createTemporary(StorageType storageType,
|
|
|
+ ExtendedBlock b, boolean isTransfer) throws IOException {
|
|
|
long startTimeMs = Time.monotonicNow();
|
|
|
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
|
|
|
ReplicaInfo lastFoundReplicaInfo = null;
|
|
|
+ boolean isInPipeline = false;
|
|
|
do {
|
|
|
synchronized (this) {
|
|
|
ReplicaInfo currentReplicaInfo =
|
|
|
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
|
|
if (currentReplicaInfo == lastFoundReplicaInfo) {
|
|
|
- if (lastFoundReplicaInfo != null) {
|
|
|
- invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
|
|
|
- }
|
|
|
- FsVolumeReference ref =
|
|
|
- volumes.getNextVolume(storageType, b.getNumBytes());
|
|
|
- FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
|
|
- // create a temporary file to hold block in the designated volume
|
|
|
- File f;
|
|
|
- try {
|
|
|
- f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
|
|
|
- } catch (IOException e) {
|
|
|
- IOUtils.cleanup(null, ref);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- ReplicaInPipeline newReplicaInfo =
|
|
|
- new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
|
|
|
- f.getParentFile(), 0);
|
|
|
- volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
|
|
- return new ReplicaHandler(newReplicaInfo, ref);
|
|
|
+ break;
|
|
|
} else {
|
|
|
- if (!(currentReplicaInfo.getGenerationStamp() < b
|
|
|
- .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
|
|
|
+ isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
|
|
|
+ || currentReplicaInfo.getState() == ReplicaState.RBW;
|
|
|
+ /*
|
|
|
+ * If the current block is old, reject.
|
|
|
+ * else If transfer request, then accept it.
|
|
|
+ * else if state is not RBW/Temporary, then reject
|
|
|
+ */
|
|
|
+ if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp())
|
|
|
+ || (!isTransfer && !isInPipeline)) {
|
|
|
throw new ReplicaAlreadyExistsException("Block " + b
|
|
|
+ " already exists in state " + currentReplicaInfo.getState()
|
|
|
+ " and thus cannot be created.");
|
|
@@ -1506,6 +1496,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if (!isInPipeline) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
// Hang too long, just bail out. This is not supposed to happen.
|
|
|
long writerStopMs = Time.monotonicNow() - startTimeMs;
|
|
|
if (writerStopMs > writerStopTimeoutMs) {
|
|
@@ -1519,6 +1512,31 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
((ReplicaInPipeline) lastFoundReplicaInfo)
|
|
|
.stopWriter(writerStopTimeoutMs);
|
|
|
} while (true);
|
|
|
+
|
|
|
+ if (lastFoundReplicaInfo != null) {
|
|
|
+ // Old blockfile should be deleted synchronously as it might collide
|
|
|
+ // with the new block if allocated in same volume.
|
|
|
+ // Do the deletion outside of lock as its DISK IO.
|
|
|
+ invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
|
|
|
+ false);
|
|
|
+ }
|
|
|
+ synchronized (this) {
|
|
|
+ FsVolumeReference ref = volumes.getNextVolume(storageType, b
|
|
|
+ .getNumBytes());
|
|
|
+ FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
|
|
+ // create a temporary file to hold block in the designated volume
|
|
|
+ File f;
|
|
|
+ try {
|
|
|
+ f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
|
|
|
+ } catch (IOException e) {
|
|
|
+ IOUtils.cleanup(null, ref);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b
|
|
|
+ .getGenerationStamp(), v, f.getParentFile(), 0);
|
|
|
+ volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
|
|
+ return new ReplicaHandler(newReplicaInfo, ref);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1849,6 +1867,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
*/
|
|
|
@Override // FsDatasetSpi
|
|
|
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
|
|
|
+ invalidate(bpid, invalidBlks, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void invalidate(String bpid, Block[] invalidBlks, boolean async)
|
|
|
+ throws IOException {
|
|
|
final List<String> errors = new ArrayList<String>();
|
|
|
for (int i = 0; i < invalidBlks.length; i++) {
|
|
|
final File f;
|
|
@@ -1910,14 +1933,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
// If the block is cached, start uncaching it.
|
|
|
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
|
|
|
|
|
|
- // Delete the block asynchronously to make sure we can do it fast enough.
|
|
|
- // It's ok to unlink the block file before the uncache operation
|
|
|
- // finishes.
|
|
|
try {
|
|
|
- asyncDiskService.deleteAsync(v.obtainReference(), f,
|
|
|
- FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
|
|
|
- new ExtendedBlock(bpid, invalidBlks[i]),
|
|
|
- dataStorage.getTrashDirectoryForBlockFile(bpid, f));
|
|
|
+ // Delete the block asynchronously to make sure we can do it fast
|
|
|
+ // enough.
|
|
|
+ // It's ok to unlink the block file before the uncache operation
|
|
|
+ // finishes.
|
|
|
+ if (async) {
|
|
|
+ asyncDiskService.deleteAsync(v.obtainReference(), f,
|
|
|
+ FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
|
|
|
+ new ExtendedBlock(bpid, invalidBlks[i]),
|
|
|
+ dataStorage.getTrashDirectoryForBlockFile(bpid, f));
|
|
|
+ } else {
|
|
|
+ asyncDiskService.deleteSync(v.obtainReference(), f,
|
|
|
+ FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
|
|
|
+ new ExtendedBlock(bpid, invalidBlks[i]),
|
|
|
+ dataStorage.getTrashDirectoryForBlockFile(bpid, f));
|
|
|
+ }
|
|
|
} catch (ClosedChannelException e) {
|
|
|
LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
|
|
|
"block " + invalidBlks[i]);
|