|
@@ -948,7 +948,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
|
|
|
@Override // FsDatasetSpi
|
|
|
- public String recoverClose(ExtendedBlock b, long newGS,
|
|
|
+ public synchronized String recoverClose(ExtendedBlock b, long newGS,
|
|
|
long expectedBlockLen) throws IOException {
|
|
|
LOG.info("Recover failed close " + b);
|
|
|
// check replica's state
|
|
@@ -1152,9 +1152,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
ExtendedBlock b) throws IOException {
|
|
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
|
|
if (replicaInfo != null) {
|
|
|
- throw new ReplicaAlreadyExistsException("Block " + b +
|
|
|
- " already exists in state " + replicaInfo.getState() +
|
|
|
- " and thus cannot be created.");
|
|
|
+ if (replicaInfo.getGenerationStamp() < b.getGenerationStamp()
|
|
|
+ && replicaInfo instanceof ReplicaInPipeline) {
|
|
|
+ // Stop the previous writer
|
|
|
+ ((ReplicaInPipeline)replicaInfo)
|
|
|
+ .stopWriter(datanode.getDnConf().getXceiverStopTimeout());
|
|
|
+ invalidate(b.getBlockPoolId(), new Block[]{replicaInfo});
|
|
|
+ } else {
|
|
|
+ throw new ReplicaAlreadyExistsException("Block " + b +
|
|
|
+ " already exists in state " + replicaInfo.getState() +
|
|
|
+ " and thus cannot be created.");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
|