|
@@ -1412,38 +1412,59 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
|
|
|
@Override // FsDatasetSpi
|
|
|
- public synchronized ReplicaHandler createTemporary(
|
|
|
+ public ReplicaHandler createTemporary(
|
|
|
StorageType storageType, ExtendedBlock b) throws IOException {
|
|
|
- ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
|
|
- if (replicaInfo != null) {
|
|
|
- if (replicaInfo.getGenerationStamp() < b.getGenerationStamp()
|
|
|
- && replicaInfo instanceof ReplicaInPipeline) {
|
|
|
- // Stop the previous writer
|
|
|
- ((ReplicaInPipeline)replicaInfo)
|
|
|
- .stopWriter(datanode.getDnConf().getXceiverStopTimeout());
|
|
|
- invalidate(b.getBlockPoolId(), new Block[]{replicaInfo});
|
|
|
- } else {
|
|
|
- throw new ReplicaAlreadyExistsException("Block " + b +
|
|
|
- " already exists in state " + replicaInfo.getState() +
|
|
|
- " and thus cannot be created.");
|
|
|
+ long startTimeMs = Time.monotonicNow();
|
|
|
+ long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
|
|
|
+ ReplicaInfo lastFoundReplicaInfo = null;
|
|
|
+ do {
|
|
|
+ synchronized (this) {
|
|
|
+ ReplicaInfo currentReplicaInfo =
|
|
|
+ volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
|
|
+ if (currentReplicaInfo == lastFoundReplicaInfo) {
|
|
|
+ if (lastFoundReplicaInfo != null) {
|
|
|
+ invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
|
|
|
+ }
|
|
|
+ FsVolumeReference ref =
|
|
|
+ volumes.getNextVolume(storageType, b.getNumBytes());
|
|
|
+ FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
|
|
+ // create a temporary file to hold block in the designated volume
|
|
|
+ File f;
|
|
|
+ try {
|
|
|
+ f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
|
|
|
+ } catch (IOException e) {
|
|
|
+ IOUtils.cleanup(null, ref);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ ReplicaInPipeline newReplicaInfo =
|
|
|
+ new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
|
|
|
+ f.getParentFile(), 0);
|
|
|
+ volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
|
|
+ return new ReplicaHandler(newReplicaInfo, ref);
|
|
|
+ } else {
|
|
|
+ if (!(currentReplicaInfo.getGenerationStamp() < b
|
|
|
+ .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
|
|
|
+ throw new ReplicaAlreadyExistsException("Block " + b
|
|
|
+ + " already exists in state " + currentReplicaInfo.getState()
|
|
|
+ + " and thus cannot be created.");
|
|
|
+ }
|
|
|
+ lastFoundReplicaInfo = currentReplicaInfo;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes());
|
|
|
- FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
|
|
- // create a temporary file to hold block in the designated volume
|
|
|
- File f;
|
|
|
- try {
|
|
|
- f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
|
|
|
- } catch (IOException e) {
|
|
|
- IOUtils.cleanup(null, ref);
|
|
|
- throw e;
|
|
|
- }
|
|
|
+ // Hang too long, just bail out. This is not supposed to happen.
|
|
|
+ long writerStopMs = Time.monotonicNow() - startTimeMs;
|
|
|
+ if (writerStopMs > writerStopTimeoutMs) {
|
|
|
+ LOG.warn("Unable to stop existing writer for block " + b + " after "
|
|
|
+ + writerStopMs + " miniseconds.");
|
|
|
+ throw new IOException("Unable to stop existing writer for block " + b
|
|
|
+ + " after " + writerStopMs + " miniseconds.");
|
|
|
+ }
|
|
|
|
|
|
- ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
|
|
- b.getGenerationStamp(), v, f.getParentFile(), 0);
|
|
|
- volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
|
|
- return new ReplicaHandler(newReplicaInfo, ref);
|
|
|
+ // Stop the previous writer
|
|
|
+ ((ReplicaInPipeline) lastFoundReplicaInfo)
|
|
|
+ .stopWriter(writerStopTimeoutMs);
|
|
|
+ } while (true);
|
|
|
}
|
|
|
|
|
|
/**
|