|
@@ -17,6 +17,8 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
+
|
|
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
|
|
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
|
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
|
@@ -45,6 +47,7 @@ import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.UUID;
|
|
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -228,11 +231,15 @@ public abstract class LazyPersistTestCase {
|
|
|
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
|
|
* capped. If ramDiskStorageLimit < 0 then it is ignored.
|
|
|
*/
|
|
|
- protected final void startUpCluster(boolean hasTransientStorage,
|
|
|
- final int ramDiskReplicaCapacity,
|
|
|
- final boolean useSCR,
|
|
|
- final boolean useLegacyBlockReaderLocal)
|
|
|
- throws IOException {
|
|
|
+ protected final void startUpCluster(
|
|
|
+ int numDatanodes,
|
|
|
+ boolean hasTransientStorage,
|
|
|
+ StorageType[] storageTypes,
|
|
|
+ int ramDiskReplicaCapacity,
|
|
|
+ long ramDiskStorageLimit,
|
|
|
+ long evictionLowWatermarkReplicas,
|
|
|
+ boolean useSCR,
|
|
|
+ boolean useLegacyBlockReaderLocal) throws IOException {
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
@@ -243,17 +250,17 @@ public abstract class LazyPersistTestCase {
|
|
|
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
|
|
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
|
|
LAZY_WRITER_INTERVAL_SEC);
|
|
|
- conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
|
|
|
- EVICTION_LOW_WATERMARK * BLOCK_SIZE);
|
|
|
+ conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
|
|
|
+ evictionLowWatermarkReplicas * BLOCK_SIZE);
|
|
|
|
|
|
if (useSCR) {
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
|
// Do not share a client context across tests.
|
|
|
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
|
|
+ conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
if (useLegacyBlockReaderLocal) {
|
|
|
conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
|
|
- conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
} else {
|
|
|
sockDir = new TemporarySocketDirectory();
|
|
|
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
|
@@ -261,22 +268,29 @@ public abstract class LazyPersistTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- long[] capacities = null;
|
|
|
+ Preconditions.checkState(
|
|
|
+ ramDiskReplicaCapacity < 0 || ramDiskStorageLimit < 0,
|
|
|
+ "Cannot specify non-default values for both ramDiskReplicaCapacity "
|
|
|
+ + "and ramDiskStorageLimit");
|
|
|
+
|
|
|
+ long[] capacities;
|
|
|
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
|
|
|
// Convert replica count to byte count, add some delta for .meta and
|
|
|
// VERSION files.
|
|
|
- long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
|
|
|
+ ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
|
|
|
(BLOCK_SIZE - 1);
|
|
|
- capacities = new long[] { ramDiskStorageLimit, -1 };
|
|
|
}
|
|
|
+ capacities = new long[] { ramDiskStorageLimit, -1 };
|
|
|
|
|
|
cluster = new MiniDFSCluster
|
|
|
.Builder(conf)
|
|
|
- .numDataNodes(REPL_FACTOR)
|
|
|
+ .numDataNodes(numDatanodes)
|
|
|
.storageCapacities(capacities)
|
|
|
- .storageTypes(hasTransientStorage ?
|
|
|
- new StorageType[]{ RAM_DISK, DEFAULT } : null)
|
|
|
+ .storageTypes(storageTypes != null ? storageTypes :
|
|
|
+ (hasTransientStorage ? new StorageType[]{RAM_DISK, DEFAULT} : null))
|
|
|
.build();
|
|
|
+ cluster.waitActive();
|
|
|
+
|
|
|
fs = cluster.getFileSystem();
|
|
|
client = fs.getClient();
|
|
|
try {
|
|
@@ -287,65 +301,77 @@ public abstract class LazyPersistTestCase {
|
|
|
LOG.info("Cluster startup complete");
|
|
|
}
|
|
|
|
|
|
+ ClusterWithRamDiskBuilder getClusterBuilder() {
|
|
|
+ return new ClusterWithRamDiskBuilder();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
|
|
- * capped. If ramDiskStorageLimit < 0 then it is ignored.
|
|
|
+ * Builder class that allows controlling RAM disk-specific properties for a
|
|
|
+ * MiniDFSCluster.
|
|
|
*/
|
|
|
- protected final void startUpCluster(final int numDataNodes,
|
|
|
- final StorageType[] storageTypes,
|
|
|
- final long ramDiskStorageLimit,
|
|
|
- final boolean useSCR)
|
|
|
- throws IOException {
|
|
|
+ class ClusterWithRamDiskBuilder {
|
|
|
+ public ClusterWithRamDiskBuilder setNumDatanodes(
|
|
|
+ int numDatanodes) {
|
|
|
+ this.numDatanodes = numDatanodes;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
- conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
- conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
|
|
|
- LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
|
|
- conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
|
|
- conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
|
|
- HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
|
|
- conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
|
|
- LAZY_WRITER_INTERVAL_SEC);
|
|
|
+ public ClusterWithRamDiskBuilder setStorageTypes(
|
|
|
+ StorageType[] storageTypes) {
|
|
|
+ this.storageTypes = storageTypes;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
|
|
|
- if (useSCR)
|
|
|
- {
|
|
|
- conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,useSCR);
|
|
|
- conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
|
|
- sockDir = new TemporarySocketDirectory();
|
|
|
- conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
|
|
- this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
|
|
|
- conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ public ClusterWithRamDiskBuilder setRamDiskReplicaCapacity(
|
|
|
+ int ramDiskReplicaCapacity) {
|
|
|
+ this.ramDiskReplicaCapacity = ramDiskReplicaCapacity;
|
|
|
+ return this;
|
|
|
}
|
|
|
|
|
|
- cluster = new MiniDFSCluster
|
|
|
- .Builder(conf)
|
|
|
- .numDataNodes(numDataNodes)
|
|
|
- .storageTypes(storageTypes != null ?
|
|
|
- storageTypes : new StorageType[] { DEFAULT, DEFAULT })
|
|
|
- .build();
|
|
|
- fs = cluster.getFileSystem();
|
|
|
- client = fs.getClient();
|
|
|
+ public ClusterWithRamDiskBuilder setRamDiskStorageLimit(
|
|
|
+ long ramDiskStorageLimit) {
|
|
|
+ this.ramDiskStorageLimit = ramDiskStorageLimit;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
|
|
|
- // Artificially cap the storage capacity of the RAM_DISK volume.
|
|
|
- if (ramDiskStorageLimit >= 0) {
|
|
|
- List<? extends FsVolumeSpi> volumes =
|
|
|
- cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
|
|
+ public ClusterWithRamDiskBuilder setUseScr(boolean useScr) {
|
|
|
+ this.useScr = useScr;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
|
|
|
- for (FsVolumeSpi volume : volumes) {
|
|
|
- if (volume.getStorageType() == RAM_DISK) {
|
|
|
- ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
|
|
|
- }
|
|
|
- }
|
|
|
+ public ClusterWithRamDiskBuilder setHasTransientStorage(
|
|
|
+ boolean hasTransientStorage) {
|
|
|
+ this.hasTransientStorage = hasTransientStorage;
|
|
|
+ return this;
|
|
|
}
|
|
|
|
|
|
- LOG.info("Cluster startup complete");
|
|
|
- }
|
|
|
+ public ClusterWithRamDiskBuilder setUseLegacyBlockReaderLocal(
|
|
|
+ boolean useLegacyBlockReaderLocal) {
|
|
|
+ this.useLegacyBlockReaderLocal = useLegacyBlockReaderLocal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public ClusterWithRamDiskBuilder setEvictionLowWatermarkReplicas(
|
|
|
+ long evictionLowWatermarkReplicas) {
|
|
|
+ this.evictionLowWatermarkReplicas = evictionLowWatermarkReplicas;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void build() throws IOException {
|
|
|
+ LazyPersistTestCase.this.startUpCluster(
|
|
|
+ numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
|
|
|
+ ramDiskStorageLimit, evictionLowWatermarkReplicas,
|
|
|
+ useScr, useLegacyBlockReaderLocal);
|
|
|
+ }
|
|
|
|
|
|
- protected final void startUpCluster(boolean hasTransientStorage,
|
|
|
- final int ramDiskReplicaCapacity)
|
|
|
- throws IOException {
|
|
|
- startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false);
|
|
|
+ private int numDatanodes = REPL_FACTOR;
|
|
|
+ private StorageType[] storageTypes = null;
|
|
|
+ private int ramDiskReplicaCapacity = -1;
|
|
|
+ private long ramDiskStorageLimit = -1;
|
|
|
+ private boolean hasTransientStorage = true;
|
|
|
+ private boolean useScr = false;
|
|
|
+ private boolean useLegacyBlockReaderLocal = false;
|
|
|
+ private long evictionLowWatermarkReplicas = EVICTION_LOW_WATERMARK;
|
|
|
}
|
|
|
|
|
|
protected final void triggerBlockReport()
|