|
@@ -69,9 +69,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
|
|
|
private String timelineRestServerBindAddress;
|
|
|
|
|
|
- private CollectorNodemanagerProtocol nmCollectorService;
|
|
|
-
|
|
|
- private InetSocketAddress nmCollectorServiceAddress;
|
|
|
+ private volatile CollectorNodemanagerProtocol nmCollectorService;
|
|
|
|
|
|
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
|
|
|
|
@@ -84,19 +82,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
super(NodeTimelineCollectorManager.class.getName());
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void serviceInit(Configuration conf) throws Exception {
|
|
|
- this.nmCollectorServiceAddress = conf.getSocketAddr(
|
|
|
- YarnConfiguration.NM_BIND_HOST,
|
|
|
- YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
|
|
|
- super.serviceInit(conf);
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
- nmCollectorService = getNMCollectorService();
|
|
|
startWebApp();
|
|
|
super.serviceStart();
|
|
|
}
|
|
@@ -176,7 +163,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
this.timelineRestServerBindAddress);
|
|
|
LOG.info("Report a new collector for application: " + appId +
|
|
|
" to the NM Collector Service.");
|
|
|
- nmCollectorService.reportNewCollectorInfo(request);
|
|
|
+ getNMCollectorService().reportNewCollectorInfo(request);
|
|
|
}
|
|
|
|
|
|
private void updateTimelineCollectorContext(
|
|
@@ -186,7 +173,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
GetTimelineCollectorContextRequest.newInstance(appId);
|
|
|
LOG.info("Get timeline collector context for " + appId);
|
|
|
GetTimelineCollectorContextResponse response =
|
|
|
- nmCollectorService.getTimelineCollectorContext(request);
|
|
|
+ getNMCollectorService().getTimelineCollectorContext(request);
|
|
|
String userId = response.getUserId();
|
|
|
if (userId != null && !userId.isEmpty()) {
|
|
|
collector.getTimelineEntityContext().setUserId(userId);
|
|
@@ -207,13 +194,26 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
protected CollectorNodemanagerProtocol getNMCollectorService() {
|
|
|
- Configuration conf = getConfig();
|
|
|
- final YarnRPC rpc = YarnRPC.create(conf);
|
|
|
-
|
|
|
- // TODO Security settings.
|
|
|
- return (CollectorNodemanagerProtocol) rpc.getProxy(
|
|
|
- CollectorNodemanagerProtocol.class,
|
|
|
- nmCollectorServiceAddress, conf);
|
|
|
+ if (nmCollectorService == null) {
|
|
|
+ synchronized (this) {
|
|
|
+ if (nmCollectorService == null) {
|
|
|
+ Configuration conf = getConfig();
|
|
|
+ InetSocketAddress nmCollectorServiceAddress = conf.getSocketAddr(
|
|
|
+ YarnConfiguration.NM_BIND_HOST,
|
|
|
+ YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
|
|
|
+ LOG.info("nmCollectorServiceAddress: " + nmCollectorServiceAddress);
|
|
|
+ final YarnRPC rpc = YarnRPC.create(conf);
|
|
|
+
|
|
|
+ // TODO Security settings.
|
|
|
+ nmCollectorService = (CollectorNodemanagerProtocol) rpc.getProxy(
|
|
|
+ CollectorNodemanagerProtocol.class,
|
|
|
+ nmCollectorServiceAddress, conf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nmCollectorService;
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|