|
@@ -182,7 +182,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
|
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final ReplicaInfo r =
|
|
final ReplicaInfo r =
|
|
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
|
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
|
@@ -192,7 +192,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public Block getStoredBlock(String bpid, long blkid)
|
|
public Block getStoredBlock(String bpid, long blkid)
|
|
throws IOException {
|
|
throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
ReplicaInfo r = volumeMap.get(bpid, blkid);
|
|
ReplicaInfo r = volumeMap.get(bpid, blkid);
|
|
if (r == null) {
|
|
if (r == null) {
|
|
return null;
|
|
return null;
|
|
@@ -205,7 +205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
|
throws IOException {
|
|
throws IOException {
|
|
Set<? extends Replica> replicas = null;
|
|
Set<? extends Replica> replicas = null;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
|
|
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
|
|
EMPTY_SET : volumeMap.replicas(bpid));
|
|
EMPTY_SET : volumeMap.replicas(bpid));
|
|
}
|
|
}
|
|
@@ -299,7 +299,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
|
|
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
|
|
TimeUnit.MILLISECONDS));
|
|
TimeUnit.MILLISECONDS));
|
|
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
|
|
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
|
|
- this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
|
|
|
|
|
|
+ boolean enableRL = conf.getBoolean(
|
|
|
|
+ DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
|
|
|
|
+ // The read lock can be disabled by the above config key. If it is disabled
|
|
|
|
+ // then we simply make the both the read and write lock variables hold
|
|
|
|
+ // the write lock. All accesses to the lock are via these variables, so that
|
|
|
|
+ // effectively disables the read lock.
|
|
|
|
+ if (enableRL) {
|
|
|
|
+ LOG.info("The datanode lock is a read write lock");
|
|
|
|
+ this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info("The datanode lock is an exclusive write lock");
|
|
|
|
+ this.datasetReadLock = this.datasetWriteLock;
|
|
|
|
+ }
|
|
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
|
|
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
|
|
|
|
|
|
// The number of volumes required for operation is the total number
|
|
// The number of volumes required for operation is the total number
|
|
@@ -339,7 +352,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
}
|
|
}
|
|
|
|
|
|
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
|
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
|
- volumeMap = new ReplicaMap(datasetRWLock);
|
|
|
|
|
|
+ volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
|
|
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
|
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
@@ -472,7 +485,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
.setConf(this.conf)
|
|
.setConf(this.conf)
|
|
.build();
|
|
.build();
|
|
FsVolumeReference ref = fsVolume.obtainReference();
|
|
FsVolumeReference ref = fsVolume.obtainReference();
|
|
- ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
|
|
|
|
|
|
+ ReplicaMap tempVolumeMap =
|
|
|
|
+ new ReplicaMap(datasetReadLock, datasetWriteLock);
|
|
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
|
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
|
|
|
|
|
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
|
|
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
|
|
@@ -807,7 +821,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
long seekOffset) throws IOException {
|
|
long seekOffset) throws IOException {
|
|
|
|
|
|
ReplicaInfo info;
|
|
ReplicaInfo info;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -895,7 +909,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
|
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
|
long blkOffset, long metaOffset) throws IOException {
|
|
long blkOffset, long metaOffset) throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
ReplicaInfo info = getReplicaInfo(b);
|
|
ReplicaInfo info = getReplicaInfo(b);
|
|
FsVolumeReference ref = info.getVolume().obtainReference();
|
|
FsVolumeReference ref = info.getVolume().obtainReference();
|
|
try {
|
|
try {
|
|
@@ -1020,7 +1034,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
}
|
|
}
|
|
|
|
|
|
FsVolumeReference volumeRef = null;
|
|
FsVolumeReference volumeRef = null;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
|
|
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
|
|
block.getNumBytes());
|
|
block.getNumBytes());
|
|
}
|
|
}
|
|
@@ -1134,7 +1148,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
FsVolumeReference volumeRef = null;
|
|
FsVolumeReference volumeRef = null;
|
|
|
|
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
volumeRef = destination.obtainReference();
|
|
volumeRef = destination.obtainReference();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1888,7 +1902,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
new HashMap<String, BlockListAsLongs.Builder>();
|
|
new HashMap<String, BlockListAsLongs.Builder>();
|
|
|
|
|
|
List<FsVolumeImpl> curVolumes = null;
|
|
List<FsVolumeImpl> curVolumes = null;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
curVolumes = volumes.getVolumes();
|
|
curVolumes = volumes.getVolumes();
|
|
for (FsVolumeSpi v : curVolumes) {
|
|
for (FsVolumeSpi v : curVolumes) {
|
|
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
|
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
|
@@ -1952,7 +1966,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
|
|
public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
|
|
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
|
|
volumeMap.size(bpid));
|
|
volumeMap.size(bpid));
|
|
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
|
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
|
@@ -2045,9 +2059,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
ReplicaInfo validateBlockFile(String bpid, long blockId) {
|
|
ReplicaInfo validateBlockFile(String bpid, long blockId) {
|
|
//Should we check for metadata file too?
|
|
//Should we check for metadata file too?
|
|
final ReplicaInfo r;
|
|
final ReplicaInfo r;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
- r = volumeMap.get(bpid, blockId);
|
|
|
|
- }
|
|
|
|
|
|
+ r = volumeMap.get(bpid, blockId);
|
|
if (r != null) {
|
|
if (r != null) {
|
|
if (r.blockDataExists()) {
|
|
if (r.blockDataExists()) {
|
|
return r;
|
|
return r;
|
|
@@ -2290,7 +2302,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public boolean contains(final ExtendedBlock block) {
|
|
public boolean contains(final ExtendedBlock block) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final long blockId = block.getLocalBlock().getBlockId();
|
|
final long blockId = block.getLocalBlock().getBlockId();
|
|
final String bpid = block.getBlockPoolId();
|
|
final String bpid = block.getBlockPoolId();
|
|
final ReplicaInfo r = volumeMap.get(bpid, blockId);
|
|
final ReplicaInfo r = volumeMap.get(bpid, blockId);
|
|
@@ -2611,7 +2623,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public String getReplicaString(String bpid, long blockId) {
|
|
public String getReplicaString(String bpid, long blockId) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final Replica r = volumeMap.get(bpid, blockId);
|
|
final Replica r = volumeMap.get(bpid, blockId);
|
|
return r == null ? "null" : r.toString();
|
|
return r == null ? "null" : r.toString();
|
|
}
|
|
}
|
|
@@ -2831,7 +2843,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public long getReplicaVisibleLength(final ExtendedBlock block)
|
|
public long getReplicaVisibleLength(final ExtendedBlock block)
|
|
throws IOException {
|
|
throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
|
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
|
block.getBlockId());
|
|
block.getBlockId());
|
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
@@ -2981,18 +2993,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
|
throws IOException {
|
|
throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
|
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
|
block.getBlockId());
|
|
block.getBlockId());
|
|
if (replica == null) {
|
|
if (replica == null) {
|
|
throw new ReplicaNotFoundException(block);
|
|
throw new ReplicaNotFoundException(block);
|
|
}
|
|
}
|
|
- if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
|
|
- throw new IOException(
|
|
|
|
- "Replica generation stamp < block generation stamp, block="
|
|
|
|
- + block + ", replica=" + replica);
|
|
|
|
- } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
|
|
|
|
- block.setGenerationStamp(replica.getGenerationStamp());
|
|
|
|
|
|
+ synchronized(replica) {
|
|
|
|
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
|
|
+ throw new IOException(
|
|
|
|
+ "Replica generation stamp < block generation stamp, block="
|
|
|
|
+ + block + ", replica=" + replica);
|
|
|
|
+ } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
|
|
|
|
+ block.setGenerationStamp(replica.getGenerationStamp());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|