|
@@ -30,6 +30,8 @@ import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.util.Preconditions;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
@@ -65,7 +67,7 @@ class FsDatasetAsyncDiskService {
|
|
|
// ThreadPool core pool size
|
|
|
private static final int CORE_THREADS_PER_VOLUME = 1;
|
|
|
// ThreadPool maximum pool size
|
|
|
- private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
|
|
|
+ private final int maxNumThreadsPerVolume;
|
|
|
// ThreadPool keep-alive time for threads over core pool size
|
|
|
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
|
|
|
|
@@ -90,6 +92,12 @@ class FsDatasetAsyncDiskService {
|
|
|
this.datanode = datanode;
|
|
|
this.fsdatasetImpl = fsdatasetImpl;
|
|
|
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
|
|
|
+ maxNumThreadsPerVolume = datanode.getConf().getInt(
|
|
|
+ DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT);
|
|
|
+ Preconditions.checkArgument(maxNumThreadsPerVolume > 0,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY +
|
|
|
+ " must be a positive integer.");
|
|
|
}
|
|
|
|
|
|
private void addExecutorForVolume(final FsVolumeImpl volume) {
|
|
@@ -110,7 +118,7 @@ class FsDatasetAsyncDiskService {
|
|
|
};
|
|
|
|
|
|
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
|
|
- CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
|
|
|
+ CORE_THREADS_PER_VOLUME, maxNumThreadsPerVolume,
|
|
|
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
|
|
|
new LinkedBlockingQueue<Runnable>(), threadFactory);
|
|
|
|