|
@@ -65,11 +65,9 @@ import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
|
|
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.DataSetSubLockStrategy;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.ModDataSetSubLockStrategy;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
|
|
import org.apache.hadoop.util.AutoCloseableLock;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
@@ -200,9 +198,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
@Override // FsDatasetSpi
|
|
|
public Block getStoredBlock(String bpid, long blkid)
|
|
|
throws IOException {
|
|
|
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
|
|
|
- bpid, getReplicaInfo(bpid, blkid).getStorageUuid(),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(blkid))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
|
|
|
+ bpid)) {
|
|
|
ReplicaInfo r = volumeMap.get(bpid, blkid);
|
|
|
if (r == null) {
|
|
|
return null;
|
|
@@ -291,9 +288,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
private long lastDirScannerNotifyTime;
|
|
|
private volatile long lastDirScannerFinishTime;
|
|
|
|
|
|
- private final DataSetSubLockStrategy datasetSubLockStrategy;
|
|
|
- private final long datasetSubLockCount;
|
|
|
-
|
|
|
/**
|
|
|
* An FSDataset has a directory where it loads its data files.
|
|
|
*/
|
|
@@ -398,9 +392,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY,
|
|
|
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT);
|
|
|
lastDirScannerNotifyTime = System.currentTimeMillis();
|
|
|
- datasetSubLockCount = conf.getLong(DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_KEY,
|
|
|
- DFSConfigKeys.DFS_DATANODE_DATASET_SUBLOCK_COUNT_DEFAULT);
|
|
|
- this.datasetSubLockStrategy = new ModDataSetSubLockStrategy(datasetSubLockCount);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -439,12 +430,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
FsVolumeReference ref) throws IOException {
|
|
|
for (String bp : volumeMap.getBlockPoolList()) {
|
|
|
lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID());
|
|
|
- List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockName();
|
|
|
- for (String dir : allSubDirNameForDataSetLock) {
|
|
|
- lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(), dir);
|
|
|
- LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
|
|
|
- bp, ref.getVolume().getStorageID(), dir);
|
|
|
- }
|
|
|
}
|
|
|
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
|
|
|
if (dnStorage != null) {
|
|
@@ -644,12 +629,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
for (String storageUuid : storageToRemove) {
|
|
|
storageMap.remove(storageUuid);
|
|
|
for (String bp : volumeMap.getBlockPoolList()) {
|
|
|
- List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockName();
|
|
|
- for (String dir : allSubDirNameForDataSetLock) {
|
|
|
- lockManager.removeLock(LockLevel.DIR, bp, storageUuid, dir);
|
|
|
- LOG.info("Removed DIR lock for bpid:{}, volume storageid:{}, dir:{}",
|
|
|
- bp, storageUuid, dir);
|
|
|
- }
|
|
|
lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
|
|
|
}
|
|
|
}
|
|
@@ -840,9 +819,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
long seekOffset) throws IOException {
|
|
|
|
|
|
ReplicaInfo info;
|
|
|
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
|
|
|
+ b.getBlockPoolId())) {
|
|
|
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
|
|
|
}
|
|
|
|
|
@@ -936,9 +914,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
@Override // FsDatasetSpi
|
|
|
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
|
|
long blkOffset, long metaOffset) throws IOException {
|
|
|
- try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
ReplicaInfo info = getReplicaInfo(b);
|
|
|
FsVolumeReference ref = info.getVolume().obtainReference();
|
|
|
try {
|
|
@@ -1403,9 +1380,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
@Override // FsDatasetSpi
|
|
|
public ReplicaHandler append(ExtendedBlock b,
|
|
|
long newGS, long expectedBlockLen) throws IOException {
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
// If the block was successfully finalized because all packets
|
|
|
// were successfully processed at the Datanode but the ack for
|
|
|
// some of the packets were not received by the client. The client
|
|
@@ -1457,9 +1433,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
private ReplicaInPipeline append(String bpid,
|
|
|
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
|
|
|
throws IOException {
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- bpid, replicaInfo.getStorageUuid(),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ bpid, replicaInfo.getStorageUuid())) {
|
|
|
// If the block is cached, start uncaching it.
|
|
|
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
|
|
|
throw new IOException("Only a Finalized replica can be appended to; "
|
|
@@ -1555,9 +1530,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
while (true) {
|
|
|
try {
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
|
|
|
+ b.getBlockPoolId())) {
|
|
|
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
|
|
FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
|
|
|
ReplicaInPipeline replica;
|
|
@@ -1590,9 +1564,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
b, newGS, expectedBlockLen);
|
|
|
while (true) {
|
|
|
try {
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
// check replica's state
|
|
|
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
|
|
|
// bump the replica's GS
|
|
@@ -1677,9 +1650,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
}
|
|
|
|
|
|
ReplicaInPipeline newReplicaInfo;
|
|
|
- try (AutoCloseableLock l = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), v.getStorageID(),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), v.getStorageID())) {
|
|
|
newReplicaInfo = v.createRbw(b);
|
|
|
if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
|
|
|
throw new IOException("CreateRBW returned a replica of state "
|
|
@@ -1709,9 +1681,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
try {
|
|
|
while (true) {
|
|
|
try {
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
ReplicaInfo replicaInfo =
|
|
|
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
|
|
|
// check the replica's state
|
|
@@ -1742,9 +1713,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
|
|
|
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
|
|
|
throws IOException {
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
// check generation stamp
|
|
|
long replicaGenerationStamp = rbw.getGenerationStamp();
|
|
|
if (replicaGenerationStamp < b.getGenerationStamp() ||
|
|
@@ -1805,9 +1775,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
public ReplicaInPipeline convertTemporaryToRbw(
|
|
|
final ExtendedBlock b) throws IOException {
|
|
|
long startTimeMs = Time.monotonicNow();
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
final long blockId = b.getBlockId();
|
|
|
final long expectedGs = b.getGenerationStamp();
|
|
|
final long visible = b.getNumBytes();
|
|
@@ -1946,9 +1915,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
.getNumBytes());
|
|
|
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
|
|
|
ReplicaInPipeline newReplicaInfo;
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), v.getStorageID(),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), v.getStorageID())) {
|
|
|
try {
|
|
|
newReplicaInfo = v.createTemporary(b);
|
|
|
LOG.debug("creating temporary for block: {} on volume: {}",
|
|
@@ -2005,9 +1973,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
ReplicaInfo replicaInfo = null;
|
|
|
ReplicaInfo finalizedReplicaInfo = null;
|
|
|
long startTimeMs = Time.monotonicNow();
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
if (Thread.interrupted()) {
|
|
|
// Don't allow data modifications from interrupted threads
|
|
|
throw new IOException("Cannot finalize block: " + b + " from Interrupted Thread");
|
|
@@ -2043,9 +2010,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
|
|
|
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
|
|
throws IOException {
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- bpid, replicaInfo.getStorageUuid(),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(replicaInfo.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ bpid, replicaInfo.getStorageUuid())) {
|
|
|
// Compare generation stamp of old and new replica before finalizing
|
|
|
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
|
|
|
> replicaInfo.getGenerationStamp()) {
|
|
@@ -2094,9 +2060,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
@Override // FsDatasetSpi
|
|
|
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
|
|
long startTimeMs = Time.monotonicNow();
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR,
|
|
|
- b.getBlockPoolId(), getStorageUuidForLock(b),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(b.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
|
|
|
+ b.getBlockPoolId(), getStorageUuidForLock(b))) {
|
|
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
|
|
b.getLocalBlock());
|
|
|
if (replicaInfo != null &&
|
|
@@ -2494,8 +2459,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
final String bpid = block.getBlockPoolId();
|
|
|
final Block localBlock = block.getLocalBlock();
|
|
|
final long blockId = localBlock.getBlockId();
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid, volume.getStorageID(),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(blockId))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
|
|
|
final ReplicaInfo info = volumeMap.get(bpid, localBlock);
|
|
|
if (info == null) {
|
|
|
ReplicaInfo infoByBlockId = volumeMap.get(bpid, blockId);
|
|
@@ -2584,8 +2548,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
bpid + ": ReplicaInfo not found.");
|
|
|
return;
|
|
|
}
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
|
|
|
- info.getStorageUuid(), datasetSubLockStrategy.blockIdToSubLock(blockId))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
|
|
|
+ info.getStorageUuid())) {
|
|
|
boolean success = false;
|
|
|
try {
|
|
|
info = volumeMap.get(bpid, blockId);
|
|
@@ -2782,8 +2746,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
lastDirScannerNotifyTime = startTimeMs;
|
|
|
}
|
|
|
String storageUuid = vol.getStorageID();
|
|
|
- try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.DIR, bpid,
|
|
|
- vol.getStorageID(), datasetSubLockStrategy.blockIdToSubLock(blockId))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, storageUuid)) {
|
|
|
if (!storageMap.containsKey(storageUuid)) {
|
|
|
// Storage was already removed
|
|
|
return;
|
|
@@ -3268,9 +3231,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
@Override // FsDatasetSpi
|
|
|
public long getReplicaVisibleLength(final ExtendedBlock block)
|
|
|
throws IOException {
|
|
|
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
|
|
|
- block.getBlockPoolId(), getStorageUuidForLock(block),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
|
|
|
+ block.getBlockPoolId())) {
|
|
|
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
|
|
block.getBlockId());
|
|
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
|
@@ -3297,12 +3259,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
Set<String> vols = storageMap.keySet();
|
|
|
for (String v : vols) {
|
|
|
lockManager.addLock(LockLevel.VOLUME, bpid, v);
|
|
|
- List<String> allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockName();
|
|
|
- for (String dir : allSubDirNameForDataSetLock) {
|
|
|
- lockManager.addLock(LockLevel.DIR, bpid, v, dir);
|
|
|
- LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}",
|
|
|
- bpid, v, dir);
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
try {
|
|
@@ -3430,9 +3386,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|
|
@Override // FsDatasetSpi
|
|
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
|
|
throws IOException {
|
|
|
- try (AutoCloseableLock lock = lockManager.readLock(LockLevel.DIR,
|
|
|
- block.getBlockPoolId(), getStorageUuidForLock(block),
|
|
|
- datasetSubLockStrategy.blockIdToSubLock(block.getBlockId()))) {
|
|
|
+ try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
|
|
|
+ block.getBlockPoolId())) {
|
|
|
final Replica replica = volumeMap.get(block.getBlockPoolId(),
|
|
|
block.getBlockId());
|
|
|
if (replica == null) {
|