|
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.locks.Condition;
|
|
import java.util.concurrent.locks.Condition;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
-import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
|
|
import javax.management.NotCompliantMBeanException;
|
|
import javax.management.NotCompliantMBeanException;
|
|
import javax.management.ObjectName;
|
|
import javax.management.ObjectName;
|
|
@@ -179,7 +178,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
|
public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final ReplicaInfo r =
|
|
final ReplicaInfo r =
|
|
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
|
return r != null ? (FsVolumeImpl) r.getVolume() : null;
|
|
@@ -189,7 +188,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public Block getStoredBlock(String bpid, long blkid)
|
|
public Block getStoredBlock(String bpid, long blkid)
|
|
throws IOException {
|
|
throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
ReplicaInfo r = volumeMap.get(bpid, blkid);
|
|
ReplicaInfo r = volumeMap.get(bpid, blkid);
|
|
if (r == null) {
|
|
if (r == null) {
|
|
return null;
|
|
return null;
|
|
@@ -206,7 +205,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
|
throws IOException {
|
|
throws IOException {
|
|
Set<? extends Replica> replicas = null;
|
|
Set<? extends Replica> replicas = null;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
replicas =
|
|
replicas =
|
|
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
|
|
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
|
|
: volumeMap.replicas(bpid));
|
|
: volumeMap.replicas(bpid));
|
|
@@ -302,7 +301,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
|
|
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
|
|
TimeUnit.MILLISECONDS));
|
|
TimeUnit.MILLISECONDS));
|
|
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
|
|
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
|
|
- this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
|
|
|
|
|
|
+ boolean enableRL = conf.getBoolean(
|
|
|
|
+ DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
|
|
|
|
+ // The read lock can be disabled by the above config key. If it is disabled
|
|
|
|
+ // then we simply make the both the read and write lock variables hold
|
|
|
|
+ // the write lock. All accesses to the lock are via these variables, so that
|
|
|
|
+ // effectively disables the read lock.
|
|
|
|
+ if (enableRL) {
|
|
|
|
+ LOG.info("The datanode lock is a read write lock");
|
|
|
|
+ this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info("The datanode lock is an exclusive write lock");
|
|
|
|
+ this.datasetReadLock = this.datasetWriteLock;
|
|
|
|
+ }
|
|
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
|
|
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
|
|
|
|
|
|
// The number of volumes required for operation is the total number
|
|
// The number of volumes required for operation is the total number
|
|
@@ -342,7 +354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
}
|
|
}
|
|
|
|
|
|
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
|
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
|
|
- volumeMap = new ReplicaMap(datasetRWLock);
|
|
|
|
|
|
+ volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
|
|
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
|
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
@@ -475,7 +487,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
.setConf(this.conf)
|
|
.setConf(this.conf)
|
|
.build();
|
|
.build();
|
|
FsVolumeReference ref = fsVolume.obtainReference();
|
|
FsVolumeReference ref = fsVolume.obtainReference();
|
|
- ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
|
|
|
|
|
|
+ ReplicaMap tempVolumeMap =
|
|
|
|
+ new ReplicaMap(datasetReadLock, datasetWriteLock);
|
|
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
|
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
|
|
|
|
|
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
|
|
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
|
|
@@ -515,7 +528,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
final FsVolumeImpl fsVolume =
|
|
final FsVolumeImpl fsVolume =
|
|
createFsVolume(sd.getStorageUuid(), sd, location);
|
|
createFsVolume(sd.getStorageUuid(), sd, location);
|
|
final ReplicaMap tempVolumeMap =
|
|
final ReplicaMap tempVolumeMap =
|
|
- new ReplicaMap(new ReentrantReadWriteLock());
|
|
|
|
|
|
+ new ReplicaMap(datasetReadLock, datasetWriteLock);
|
|
ArrayList<IOException> exceptions = Lists.newArrayList();
|
|
ArrayList<IOException> exceptions = Lists.newArrayList();
|
|
|
|
|
|
for (final NamespaceInfo nsInfo : nsInfos) {
|
|
for (final NamespaceInfo nsInfo : nsInfos) {
|
|
@@ -810,7 +823,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
long seekOffset) throws IOException {
|
|
long seekOffset) throws IOException {
|
|
|
|
|
|
ReplicaInfo info;
|
|
ReplicaInfo info;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -898,7 +911,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
|
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
|
long blkOffset, long metaOffset) throws IOException {
|
|
long blkOffset, long metaOffset) throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
ReplicaInfo info = getReplicaInfo(b);
|
|
ReplicaInfo info = getReplicaInfo(b);
|
|
FsVolumeReference ref = info.getVolume().obtainReference();
|
|
FsVolumeReference ref = info.getVolume().obtainReference();
|
|
try {
|
|
try {
|
|
@@ -1023,7 +1036,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
}
|
|
}
|
|
|
|
|
|
FsVolumeReference volumeRef = null;
|
|
FsVolumeReference volumeRef = null;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
|
|
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
|
|
block.getNumBytes());
|
|
block.getNumBytes());
|
|
}
|
|
}
|
|
@@ -1137,7 +1150,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
FsVolumeReference volumeRef = null;
|
|
FsVolumeReference volumeRef = null;
|
|
|
|
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
volumeRef = destination.obtainReference();
|
|
volumeRef = destination.obtainReference();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1891,7 +1904,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
new HashMap<String, BlockListAsLongs.Builder>();
|
|
new HashMap<String, BlockListAsLongs.Builder>();
|
|
|
|
|
|
List<FsVolumeImpl> curVolumes = null;
|
|
List<FsVolumeImpl> curVolumes = null;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
curVolumes = volumes.getVolumes();
|
|
curVolumes = volumes.getVolumes();
|
|
for (FsVolumeSpi v : curVolumes) {
|
|
for (FsVolumeSpi v : curVolumes) {
|
|
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
|
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
|
|
@@ -1954,7 +1967,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
|
|
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
|
|
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
|
|
volumeMap.size(bpid));
|
|
volumeMap.size(bpid));
|
|
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
|
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
|
@@ -2047,9 +2060,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
ReplicaInfo validateBlockFile(String bpid, long blockId) {
|
|
ReplicaInfo validateBlockFile(String bpid, long blockId) {
|
|
//Should we check for metadata file too?
|
|
//Should we check for metadata file too?
|
|
final ReplicaInfo r;
|
|
final ReplicaInfo r;
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
- r = volumeMap.get(bpid, blockId);
|
|
|
|
- }
|
|
|
|
|
|
+ r = volumeMap.get(bpid, blockId);
|
|
if (r != null) {
|
|
if (r != null) {
|
|
if (r.blockDataExists()) {
|
|
if (r.blockDataExists()) {
|
|
return r;
|
|
return r;
|
|
@@ -2292,7 +2303,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public boolean contains(final ExtendedBlock block) {
|
|
public boolean contains(final ExtendedBlock block) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final long blockId = block.getLocalBlock().getBlockId();
|
|
final long blockId = block.getLocalBlock().getBlockId();
|
|
final String bpid = block.getBlockPoolId();
|
|
final String bpid = block.getBlockPoolId();
|
|
final ReplicaInfo r = volumeMap.get(bpid, blockId);
|
|
final ReplicaInfo r = volumeMap.get(bpid, blockId);
|
|
@@ -2613,7 +2624,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public String getReplicaString(String bpid, long blockId) {
|
|
public String getReplicaString(String bpid, long blockId) {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final Replica r = volumeMap.get(bpid, blockId);
|
|
final Replica r = volumeMap.get(bpid, blockId);
|
|
return r == null ? "null" : r.toString();
|
|
return r == null ? "null" : r.toString();
|
|
}
|
|
}
|
|
@@ -2833,7 +2844,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public long getReplicaVisibleLength(final ExtendedBlock block)
|
|
public long getReplicaVisibleLength(final ExtendedBlock block)
|
|
throws IOException {
|
|
throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
|
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
|
block.getBlockId());
|
|
block.getBlockId());
|
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
@@ -2983,18 +2994,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
@Override // FsDatasetSpi
|
|
@Override // FsDatasetSpi
|
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
|
throws IOException {
|
|
throws IOException {
|
|
- try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
|
|
|
|
|
|
+ try (AutoCloseableLock lock = datasetReadLock.acquire()) {
|
|
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
|
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
|
block.getBlockId());
|
|
block.getBlockId());
|
|
if (replica == null) {
|
|
if (replica == null) {
|
|
throw new ReplicaNotFoundException(block);
|
|
throw new ReplicaNotFoundException(block);
|
|
}
|
|
}
|
|
- if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
|
|
- throw new IOException(
|
|
|
|
- "Replica generation stamp < block generation stamp, block="
|
|
|
|
- + block + ", replica=" + replica);
|
|
|
|
- } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
|
|
|
|
- block.setGenerationStamp(replica.getGenerationStamp());
|
|
|
|
|
|
+ synchronized(replica) {
|
|
|
|
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
|
|
+ throw new IOException(
|
|
|
|
+ "Replica generation stamp < block generation stamp, block="
|
|
|
|
+ + block + ", replica=" + replica);
|
|
|
|
+ } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
|
|
|
|
+ block.setGenerationStamp(replica.getGenerationStamp());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|