|
@@ -61,14 +61,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
|
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
|
|
|
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
@@ -86,13 +82,13 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|
|
*
|
|
|
***************************************************/
|
|
|
@InterfaceAudience.Private
|
|
|
-public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
+class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* A factory for creating FSDataset objects.
|
|
|
*/
|
|
|
- public static class Factory extends FsDatasetSpi.Factory<FSDataset> {
|
|
|
+ static class Factory extends FSDatasetInterface.Factory<FSDataset> {
|
|
|
@Override
|
|
|
- public FSDataset newInstance(DataNode datanode,
|
|
|
+ public FSDataset createFSDatasetInterface(DataNode datanode,
|
|
|
DataStorage storage, Configuration conf) throws IOException {
|
|
|
return new FSDataset(datanode, storage, conf);
|
|
|
}
|
|
@@ -827,11 +823,11 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
*/
|
|
|
private volatile List<FSVolume> volumes = null;
|
|
|
|
|
|
- final VolumeChoosingPolicy<FSVolume> blockChooser;
|
|
|
+ BlockVolumeChoosingPolicy<FSVolume> blockChooser;
|
|
|
int numFailedVolumes;
|
|
|
|
|
|
FSVolumeSet(List<FSVolume> volumes, int failedVols,
|
|
|
- VolumeChoosingPolicy<FSVolume> blockChooser) {
|
|
|
+ BlockVolumeChoosingPolicy<FSVolume> blockChooser) {
|
|
|
this.volumes = Collections.unmodifiableList(volumes);
|
|
|
this.blockChooser = blockChooser;
|
|
|
this.numFailedVolumes = failedVols;
|
|
@@ -1022,7 +1018,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public List<FSVolume> getVolumes() {
|
|
|
return volumes.volumes;
|
|
|
}
|
|
@@ -1033,7 +1029,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return r != null? (FSVolume)r.getVolume(): null;
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized Block getStoredBlock(String bpid, long blkid)
|
|
|
throws IOException {
|
|
|
File blockfile = getFile(bpid, blkid);
|
|
@@ -1070,7 +1066,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
|
|
throws IOException {
|
|
|
final File meta = getMetaFile(b);
|
|
@@ -1129,11 +1125,11 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
volumeMap = new ReplicasMap(this);
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final VolumeChoosingPolicy<FSVolume> blockChooserImpl =
|
|
|
+ final BlockVolumeChoosingPolicy<FSVolume> blockChooserImpl =
|
|
|
ReflectionUtils.newInstance(conf.getClass(
|
|
|
- DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
|
|
|
- RoundRobinVolumeChoosingPolicy.class,
|
|
|
- VolumeChoosingPolicy.class), conf);
|
|
|
+ DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY,
|
|
|
+ RoundRobinVolumesPolicy.class,
|
|
|
+ BlockVolumeChoosingPolicy.class), conf);
|
|
|
volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
|
|
|
volumes.getVolumeMap(volumeMap);
|
|
|
|
|
@@ -1168,7 +1164,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* Return true - if there are still valid volumes on the DataNode.
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public boolean hasEnoughResource() {
|
|
|
return getVolumes().size() >= validVolsRequired;
|
|
|
}
|
|
@@ -1203,7 +1199,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* Find the block's on-disk length
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public long getLength(ExtendedBlock b) throws IOException {
|
|
|
return getBlockFile(b).length();
|
|
|
}
|
|
@@ -1247,7 +1243,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return f;
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public InputStream getBlockInputStream(ExtendedBlock b,
|
|
|
long seekOffset) throws IOException {
|
|
|
File blockFile = getBlockFileNoExistsCheck(b);
|
|
@@ -1305,7 +1301,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* Returns handles to the block file and its metadata file
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
|
|
|
long blkOffset, long ckoff) throws IOException {
|
|
|
ReplicaInfo info = getReplicaInfo(b);
|
|
@@ -1410,7 +1406,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
}
|
|
|
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
|
|
|
long newGS, long expectedBlockLen) throws IOException {
|
|
|
// If the block was successfully finalized because all packets
|
|
@@ -1551,7 +1547,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return replicaInfo;
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
|
|
|
long newGS, long expectedBlockLen) throws IOException {
|
|
|
DataNode.LOG.info("Recover failed append to " + b);
|
|
@@ -1568,7 +1564,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public void recoverClose(ExtendedBlock b, long newGS,
|
|
|
long expectedBlockLen) throws IOException {
|
|
|
DataNode.LOG.info("Recover failed close " + b);
|
|
@@ -1610,7 +1606,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
|
|
|
throws IOException {
|
|
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
|
@@ -1630,7 +1626,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return newReplicaInfo;
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
|
|
|
long newGS, long minBytesRcvd, long maxBytesRcvd)
|
|
|
throws IOException {
|
|
@@ -1675,7 +1671,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return rbw;
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
|
|
|
final ExtendedBlock b) throws IOException {
|
|
|
final long blockId = b.getBlockId();
|
|
@@ -1736,7 +1732,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return rbw;
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
|
|
|
throws IOException {
|
|
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
|
|
@@ -1760,7 +1756,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
* Sets the offset in the meta file so that the
|
|
|
* last checksum will be overwritten.
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
|
|
|
int checksumSize) throws IOException {
|
|
|
FileOutputStream file = (FileOutputStream) streams.getChecksumOut();
|
|
@@ -1785,7 +1781,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* Complete the block write!
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
|
|
|
ReplicaInfo replicaInfo = getReplicaInfo(b);
|
|
|
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
|
|
@@ -1822,7 +1818,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* Remove the temporary block file (if any)
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
|
|
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
|
|
b.getLocalBlock());
|
|
@@ -1867,7 +1863,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* Generates a block report from the in-memory block map.
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public BlockListAsLongs getBlockReport(String bpid) {
|
|
|
int size = volumeMap.size(bpid);
|
|
|
ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
|
|
@@ -1918,7 +1914,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
* Check whether the given block is a valid one.
|
|
|
* valid means finalized
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public boolean isValidBlock(ExtendedBlock b) {
|
|
|
return isValid(b, ReplicaState.FINALIZED);
|
|
|
}
|
|
@@ -1926,7 +1922,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* Check whether the given block is a valid RBW.
|
|
|
*/
|
|
|
- @Override // {@link FsDatasetSpi}
|
|
|
+ @Override // {@link FSDatasetInterface}
|
|
|
public boolean isValidRbw(final ExtendedBlock b) {
|
|
|
return isValid(b, ReplicaState.RBW);
|
|
|
}
|
|
@@ -1991,7 +1987,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
* could lazily garbage-collect the block, but why bother?
|
|
|
* just get rid of it.
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
|
|
|
boolean error = false;
|
|
|
for (int i = 0; i < invalidBlks.length; i++) {
|
|
@@ -2057,7 +2053,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
datanode.notifyNamenodeDeletedBlock(block);
|
|
|
}
|
|
|
|
|
|
- @Override // {@link FsDatasetSpi}
|
|
|
+ @Override // {@link FSDatasetInterface}
|
|
|
public synchronized boolean contains(final ExtendedBlock block) {
|
|
|
final long blockId = block.getLocalBlock().getBlockId();
|
|
|
return getFile(block.getBlockPoolId(), blockId) != null;
|
|
@@ -2082,7 +2078,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
* to these volumes
|
|
|
* @throws DiskErrorException
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public void checkDataDir() throws DiskErrorException {
|
|
|
long totalBlocks=0, removedBlocks=0;
|
|
|
List<FSVolume> failedVols = volumes.checkDirs();
|
|
@@ -2126,7 +2122,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
}
|
|
|
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public String toString() {
|
|
|
return "FSDataset{dirpath='"+volumes+"'}";
|
|
|
}
|
|
@@ -2157,7 +2153,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
DataNode.LOG.info("Registered FSDatasetState MBean");
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public void shutdown() {
|
|
|
if (mbeanName != null)
|
|
|
MBeans.unregister(mbeanName);
|
|
@@ -2338,7 +2334,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
/**
|
|
|
* @deprecated use {@link #fetchReplicaInfo(String, long)} instead.
|
|
|
*/
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
@Deprecated
|
|
|
public ReplicaInfo getReplica(String bpid, long blockId) {
|
|
|
return volumeMap.get(bpid, blockId);
|
|
@@ -2350,7 +2346,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return r == null? "null": r.toString();
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized ReplicaRecoveryInfo initReplicaRecovery(
|
|
|
RecoveringBlock rBlock) throws IOException {
|
|
|
return initReplicaRecovery(rBlock.getBlock().getBlockPoolId(),
|
|
@@ -2423,7 +2419,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return rur.createInfo();
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized String updateReplicaUnderRecovery(
|
|
|
final ExtendedBlock oldBlock,
|
|
|
final long recoveryId,
|
|
@@ -2505,7 +2501,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return finalizeReplica(bpid, rur);
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
|
|
|
throws IOException {
|
|
|
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
|
|
@@ -2588,7 +2584,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
return info;
|
|
|
}
|
|
|
|
|
|
- @Override //FsDatasetSpi
|
|
|
+ @Override //FSDatasetInterface
|
|
|
public synchronized void deleteBlockPool(String bpid, boolean force)
|
|
|
throws IOException {
|
|
|
if (!force) {
|
|
@@ -2606,7 +2602,7 @@ public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override // FsDatasetSpi
|
|
|
+ @Override // FSDatasetInterface
|
|
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
|
|
|
throws IOException {
|
|
|
File datafile = getBlockFile(block);
|