|
@@ -46,6 +46,7 @@ import java.util.HashSet;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
@@ -103,6 +104,8 @@ public class DatasetVolumeChecker {
|
|
private static final VolumeCheckContext IGNORED_CONTEXT =
|
|
private static final VolumeCheckContext IGNORED_CONTEXT =
|
|
new VolumeCheckContext();
|
|
new VolumeCheckContext();
|
|
|
|
|
|
|
|
+ private final ExecutorService checkVolumeResultHandlerExecutorService;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* @param conf Configuration object.
|
|
* @param conf Configuration object.
|
|
* @param timer {@link Timer} object used for throttling checks.
|
|
* @param timer {@link Timer} object used for throttling checks.
|
|
@@ -163,6 +166,12 @@ public class DatasetVolumeChecker {
|
|
.setNameFormat("DataNode DiskChecker thread %d")
|
|
.setNameFormat("DataNode DiskChecker thread %d")
|
|
.setDaemon(true)
|
|
.setDaemon(true)
|
|
.build()));
|
|
.build()));
|
|
|
|
+
|
|
|
|
+ checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
|
|
|
|
+ new ThreadFactoryBuilder()
|
|
|
|
+ .setNameFormat("VolumeCheck ResultHandler thread %d")
|
|
|
|
+ .setDaemon(true)
|
|
|
|
+ .build());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -293,7 +302,9 @@ public class DatasetVolumeChecker {
|
|
Futures.addCallback(olf.get(),
|
|
Futures.addCallback(olf.get(),
|
|
new ResultHandler(volumeReference, new HashSet<FsVolumeSpi>(),
|
|
new ResultHandler(volumeReference, new HashSet<FsVolumeSpi>(),
|
|
new HashSet<FsVolumeSpi>(),
|
|
new HashSet<FsVolumeSpi>(),
|
|
- new AtomicLong(1), callback));
|
|
|
|
|
|
+ new AtomicLong(1), callback),
|
|
|
|
+ checkVolumeResultHandlerExecutorService
|
|
|
|
+ );
|
|
return true;
|
|
return true;
|
|
} else {
|
|
} else {
|
|
IOUtils.cleanup(null, volumeReference);
|
|
IOUtils.cleanup(null, volumeReference);
|