|
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
@@ -110,6 +111,8 @@ import org.apache.hadoop.yarn.service.AbstractService;
|
|
import org.apache.hadoop.yarn.service.CompositeService;
|
|
import org.apache.hadoop.yarn.service.CompositeService;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
|
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
+
|
|
public class ResourceLocalizationService extends CompositeService
|
|
public class ResourceLocalizationService extends CompositeService
|
|
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
|
|
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
|
|
|
|
|
|
@@ -156,7 +159,10 @@ public class ResourceLocalizationService extends CompositeService
|
|
this.delService = delService;
|
|
this.delService = delService;
|
|
this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
|
|
this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
|
|
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
|
|
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
|
|
- this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
|
|
|
|
|
|
+ this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
|
|
|
|
+ new ThreadFactoryBuilder()
|
|
|
|
+ .setNameFormat("ResourceLocalizationService Cache Cleanup")
|
|
|
|
+ .build());
|
|
}
|
|
}
|
|
|
|
|
|
FileContext getLocalFileContext(Configuration conf) {
|
|
FileContext getLocalFileContext(Configuration conf) {
|
|
@@ -532,6 +538,17 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static ExecutorService createLocalizerExecutor(Configuration conf) {
|
|
|
|
+ int nThreads = conf.getInt(
|
|
|
|
+ YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
|
|
|
|
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT);
|
|
|
|
+ ThreadFactory tf = new ThreadFactoryBuilder()
|
|
|
|
+ .setNameFormat("PublicLocalizer #%d")
|
|
|
|
+ .build();
|
|
|
|
+ return Executors.newFixedThreadPool(nThreads, tf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
class PublicLocalizer extends Thread {
|
|
class PublicLocalizer extends Thread {
|
|
|
|
|
|
static final String PUBCACHE_CTXT = "public.cache.dirs";
|
|
static final String PUBCACHE_CTXT = "public.cache.dirs";
|
|
@@ -547,16 +564,16 @@ public class ResourceLocalizationService extends CompositeService
|
|
|
|
|
|
PublicLocalizer(Configuration conf) {
|
|
PublicLocalizer(Configuration conf) {
|
|
this(conf, getLocalFileContext(conf),
|
|
this(conf, getLocalFileContext(conf),
|
|
- Executors.newFixedThreadPool(conf.getInt(
|
|
|
|
- YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT)),
|
|
|
|
|
|
+ createLocalizerExecutor(conf),
|
|
new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
|
|
new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
|
|
new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
|
|
new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
PublicLocalizer(Configuration conf, FileContext lfs,
|
|
PublicLocalizer(Configuration conf, FileContext lfs,
|
|
ExecutorService threadPool,
|
|
ExecutorService threadPool,
|
|
Map<Future<Path>,LocalizerResourceRequestEvent> pending,
|
|
Map<Future<Path>,LocalizerResourceRequestEvent> pending,
|
|
Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
|
|
Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
|
|
|
|
+ super("Public Localizer");
|
|
this.lfs = lfs;
|
|
this.lfs = lfs;
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
this.pending = pending;
|
|
this.pending = pending;
|
|
@@ -673,6 +690,7 @@ public class ResourceLocalizationService extends CompositeService
|
|
RecordFactoryProvider.getRecordFactory(getConfig());
|
|
RecordFactoryProvider.getRecordFactory(getConfig());
|
|
|
|
|
|
LocalizerRunner(LocalizerContext context, String localizerId) {
|
|
LocalizerRunner(LocalizerContext context, String localizerId) {
|
|
|
|
+ super("LocalizerRunner for " + localizerId);
|
|
this.context = context;
|
|
this.context = context;
|
|
this.localizerId = localizerId;
|
|
this.localizerId = localizerId;
|
|
this.pending = new ArrayList<LocalizerResourceRequestEvent>();
|
|
this.pending = new ArrayList<LocalizerResourceRequestEvent>();
|
|
@@ -863,6 +881,7 @@ public class ResourceLocalizationService extends CompositeService
|
|
private final Dispatcher dispatcher;
|
|
private final Dispatcher dispatcher;
|
|
|
|
|
|
public CacheCleanup(Dispatcher dispatcher) {
|
|
public CacheCleanup(Dispatcher dispatcher) {
|
|
|
|
+ super("CacheCleanup");
|
|
this.dispatcher = dispatcher;
|
|
this.dispatcher = dispatcher;
|
|
}
|
|
}
|
|
|
|
|