|
@@ -107,9 +107,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
|
+import org.apache.hadoop.yarn.service.CompositeService;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
|
|
|
-public class ResourceLocalizationService extends AbstractService
|
|
|
|
|
|
+public class ResourceLocalizationService extends CompositeService
|
|
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
|
|
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class);
|
|
private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class);
|
|
@@ -201,9 +202,8 @@ public class ResourceLocalizationService extends AbstractService
|
|
localizationServerAddress = NetUtils.createSocketAddr(
|
|
localizationServerAddress = NetUtils.createSocketAddr(
|
|
conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
|
|
conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
|
|
localizerTracker = createLocalizerTracker(conf);
|
|
localizerTracker = createLocalizerTracker(conf);
|
|
|
|
+ addService(localizerTracker);
|
|
dispatcher.register(LocalizerEventType.class, localizerTracker);
|
|
dispatcher.register(LocalizerEventType.class, localizerTracker);
|
|
- cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
|
|
|
|
- cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
|
|
|
|
super.init(conf);
|
|
super.init(conf);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -214,6 +214,8 @@ public class ResourceLocalizationService extends AbstractService
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void start() {
|
|
public void start() {
|
|
|
|
+ cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
|
|
|
|
+ cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
|
|
server = createServer();
|
|
server = createServer();
|
|
LOG.info("Localizer started on port " + server.getPort());
|
|
LOG.info("Localizer started on port " + server.getPort());
|
|
server.start();
|
|
server.start();
|
|
@@ -247,9 +249,7 @@ public class ResourceLocalizationService extends AbstractService
|
|
if (server != null) {
|
|
if (server != null) {
|
|
server.close();
|
|
server.close();
|
|
}
|
|
}
|
|
- if (localizerTracker != null) {
|
|
|
|
- localizerTracker.stop();
|
|
|
|
- }
|
|
|
|
|
|
+ cacheCleanup.shutdown();
|
|
super.stop();
|
|
super.stop();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -403,7 +403,7 @@ public class ResourceLocalizationService extends AbstractService
|
|
/**
|
|
/**
|
|
* Sub-component handling the spawning of {@link ContainerLocalizer}s
|
|
* Sub-component handling the spawning of {@link ContainerLocalizer}s
|
|
*/
|
|
*/
|
|
- class LocalizerTracker implements EventHandler<LocalizerEvent> {
|
|
|
|
|
|
+ class LocalizerTracker extends AbstractService implements EventHandler<LocalizerEvent> {
|
|
|
|
|
|
private final PublicLocalizer publicLocalizer;
|
|
private final PublicLocalizer publicLocalizer;
|
|
private final Map<String,LocalizerRunner> privLocalizers;
|
|
private final Map<String,LocalizerRunner> privLocalizers;
|
|
@@ -414,9 +414,15 @@ public class ResourceLocalizationService extends AbstractService
|
|
|
|
|
|
LocalizerTracker(Configuration conf,
|
|
LocalizerTracker(Configuration conf,
|
|
Map<String,LocalizerRunner> privLocalizers) {
|
|
Map<String,LocalizerRunner> privLocalizers) {
|
|
|
|
+ super(LocalizerTracker.class.getName());
|
|
this.publicLocalizer = new PublicLocalizer(conf);
|
|
this.publicLocalizer = new PublicLocalizer(conf);
|
|
this.privLocalizers = privLocalizers;
|
|
this.privLocalizers = privLocalizers;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void start() {
|
|
publicLocalizer.start();
|
|
publicLocalizer.start();
|
|
|
|
+ super.start();
|
|
}
|
|
}
|
|
|
|
|
|
public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
|
|
public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
|
|
@@ -435,12 +441,14 @@ public class ResourceLocalizationService extends AbstractService
|
|
return localizer.update(status.getResources());
|
|
return localizer.update(status.getResources());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ @Override
|
|
public void stop() {
|
|
public void stop() {
|
|
for (LocalizerRunner localizer : privLocalizers.values()) {
|
|
for (LocalizerRunner localizer : privLocalizers.values()) {
|
|
localizer.interrupt();
|
|
localizer.interrupt();
|
|
}
|
|
}
|
|
publicLocalizer.interrupt();
|
|
publicLocalizer.interrupt();
|
|
|
|
+ super.stop();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|