|
@@ -16,11 +16,14 @@
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
|
|
|
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
|
|
|
+package org.apache.hadoop.yarn.server.timelineservice.collector;
|
|
|
+
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.net.URI;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.URI;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
@@ -39,64 +42,64 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
|
|
|
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
|
|
|
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
|
|
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
|
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
|
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
|
|
|
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
|
|
|
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
|
- * Class that manages adding and removing aggregators and their lifecycle. It
|
|
|
- * provides thread safety access to the aggregators inside.
|
|
|
+ * Class that manages adding and removing collectors and their lifecycle. It
|
|
|
+ * provides thread safety access to the collectors inside.
|
|
|
*
|
|
|
* It is a singleton, and instances should be obtained via
|
|
|
* {@link #getInstance()}.
|
|
|
*/
|
|
|
@Private
|
|
|
@Unstable
|
|
|
-public class TimelineAggregatorsCollection extends CompositeService {
|
|
|
+public class TimelineCollectorManager extends CompositeService {
|
|
|
private static final Log LOG =
|
|
|
- LogFactory.getLog(TimelineAggregatorsCollection.class);
|
|
|
- private static final TimelineAggregatorsCollection INSTANCE =
|
|
|
- new TimelineAggregatorsCollection();
|
|
|
+ LogFactory.getLog(TimelineCollectorManager.class);
|
|
|
+ private static final TimelineCollectorManager INSTANCE =
|
|
|
+ new TimelineCollectorManager();
|
|
|
|
|
|
// access to this map is synchronized with the map itself
|
|
|
- private final Map<String, TimelineAggregator> aggregators =
|
|
|
+ private final Map<String, TimelineCollector> collectors =
|
|
|
Collections.synchronizedMap(
|
|
|
- new HashMap<String, TimelineAggregator>());
|
|
|
+ new HashMap<String, TimelineCollector>());
|
|
|
|
|
|
- // REST server for this aggregator collection
|
|
|
+ // REST server for this collector manager
|
|
|
private HttpServer2 timelineRestServer;
|
|
|
-
|
|
|
+
|
|
|
private String timelineRestServerBindAddress;
|
|
|
-
|
|
|
- private AggregatorNodemanagerProtocol nmAggregatorService;
|
|
|
-
|
|
|
- private InetSocketAddress nmAggregatorServiceAddress;
|
|
|
|
|
|
- static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
|
|
|
+ private CollectorNodemanagerProtocol nmCollectorService;
|
|
|
+
|
|
|
+ private InetSocketAddress nmCollectorServiceAddress;
|
|
|
+
|
|
|
+ static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
|
|
|
|
|
- static TimelineAggregatorsCollection getInstance() {
|
|
|
+ static TimelineCollectorManager getInstance() {
|
|
|
return INSTANCE;
|
|
|
}
|
|
|
|
|
|
- TimelineAggregatorsCollection() {
|
|
|
- super(TimelineAggregatorsCollection.class.getName());
|
|
|
+ @VisibleForTesting
|
|
|
+ protected TimelineCollectorManager() {
|
|
|
+ super(TimelineCollectorManager.class.getName());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void serviceInit(Configuration conf) throws Exception {
|
|
|
- this.nmAggregatorServiceAddress = conf.getSocketAddr(
|
|
|
+ this.nmCollectorServiceAddress = conf.getSocketAddr(
|
|
|
YarnConfiguration.NM_BIND_HOST,
|
|
|
- YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
|
|
|
-
|
|
|
+ YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
startWebApp();
|
|
@@ -112,95 +115,95 @@ public class TimelineAggregatorsCollection extends CompositeService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Put the aggregator into the collection if an aggregator mapped by id does
|
|
|
+ * Put the collector into the collection if an collector mapped by id does
|
|
|
* not exist.
|
|
|
*
|
|
|
* @throws YarnRuntimeException if there was any exception in initializing and
|
|
|
* starting the app level service
|
|
|
- * @return the aggregator associated with id after the potential put.
|
|
|
+ * @return the collector associated with id after the potential put.
|
|
|
*/
|
|
|
- public TimelineAggregator putIfAbsent(ApplicationId appId,
|
|
|
- TimelineAggregator aggregator) {
|
|
|
+ public TimelineCollector putIfAbsent(ApplicationId appId,
|
|
|
+ TimelineCollector collector) {
|
|
|
String id = appId.toString();
|
|
|
- TimelineAggregator aggregatorInTable;
|
|
|
- boolean aggregatorIsNew = false;
|
|
|
- synchronized (aggregators) {
|
|
|
- aggregatorInTable = aggregators.get(id);
|
|
|
- if (aggregatorInTable == null) {
|
|
|
+ TimelineCollector collectorInTable;
|
|
|
+ boolean collectorIsNew = false;
|
|
|
+ synchronized (collectors) {
|
|
|
+ collectorInTable = collectors.get(id);
|
|
|
+ if (collectorInTable == null) {
|
|
|
try {
|
|
|
// initialize, start, and add it to the collection so it can be
|
|
|
// cleaned up when the parent shuts down
|
|
|
- aggregator.init(getConfig());
|
|
|
- aggregator.start();
|
|
|
- aggregators.put(id, aggregator);
|
|
|
- LOG.info("the aggregator for " + id + " was added");
|
|
|
- aggregatorInTable = aggregator;
|
|
|
- aggregatorIsNew = true;
|
|
|
+ collector.init(getConfig());
|
|
|
+ collector.start();
|
|
|
+ collectors.put(id, collector);
|
|
|
+ LOG.info("the collector for " + id + " was added");
|
|
|
+ collectorInTable = collector;
|
|
|
+ collectorIsNew = true;
|
|
|
} catch (Exception e) {
|
|
|
throw new YarnRuntimeException(e);
|
|
|
}
|
|
|
} else {
|
|
|
- String msg = "the aggregator for " + id + " already exists!";
|
|
|
+ String msg = "the collector for " + id + " already exists!";
|
|
|
LOG.error(msg);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
- // Report to NM if a new aggregator is added.
|
|
|
- if (aggregatorIsNew) {
|
|
|
+ // Report to NM if a new collector is added.
|
|
|
+ if (collectorIsNew) {
|
|
|
try {
|
|
|
- reportNewAggregatorToNM(appId);
|
|
|
+ reportNewCollectorToNM(appId);
|
|
|
} catch (Exception e) {
|
|
|
// throw exception here as it cannot be used if failed report to NM
|
|
|
- LOG.error("Failed to report a new aggregator for application: " + appId +
|
|
|
- " to NM Aggregator Services.");
|
|
|
+ LOG.error("Failed to report a new collector for application: " + appId +
|
|
|
+ " to the NM Collector Service.");
|
|
|
throw new YarnRuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- return aggregatorInTable;
|
|
|
+
|
|
|
+ return collectorInTable;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Removes the aggregator for the specified id. The aggregator is also stopped
|
|
|
- * as a result. If the aggregator does not exist, no change is made.
|
|
|
+ * Removes the collector for the specified id. The collector is also stopped
|
|
|
+ * as a result. If the collector does not exist, no change is made.
|
|
|
*
|
|
|
* @return whether it was removed successfully
|
|
|
*/
|
|
|
public boolean remove(String id) {
|
|
|
- synchronized (aggregators) {
|
|
|
- TimelineAggregator aggregator = aggregators.remove(id);
|
|
|
- if (aggregator == null) {
|
|
|
- String msg = "the aggregator for " + id + " does not exist!";
|
|
|
+ synchronized (collectors) {
|
|
|
+ TimelineCollector collector = collectors.remove(id);
|
|
|
+ if (collector == null) {
|
|
|
+ String msg = "the collector for " + id + " does not exist!";
|
|
|
LOG.error(msg);
|
|
|
return false;
|
|
|
} else {
|
|
|
// stop the service to do clean up
|
|
|
- aggregator.stop();
|
|
|
- LOG.info("the aggregator service for " + id + " was removed");
|
|
|
+ collector.stop();
|
|
|
+ LOG.info("the collector service for " + id + " was removed");
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the aggregator for the specified id.
|
|
|
+ * Returns the collector for the specified id.
|
|
|
*
|
|
|
- * @return the aggregator or null if it does not exist
|
|
|
+ * @return the collector or null if it does not exist
|
|
|
*/
|
|
|
- public TimelineAggregator get(String id) {
|
|
|
- return aggregators.get(id);
|
|
|
+ public TimelineCollector get(String id) {
|
|
|
+ return collectors.get(id);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns whether the aggregator for the specified id exists in this
|
|
|
+ * Returns whether the collector for the specified id exists in this
|
|
|
* collection.
|
|
|
*/
|
|
|
public boolean containsKey(String id) {
|
|
|
- return aggregators.containsKey(id);
|
|
|
+ return collectors.containsKey(id);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Launch the REST web server for this aggregator collection
|
|
|
+ * Launch the REST web server for this collector manager
|
|
|
*/
|
|
|
private void startWebApp() {
|
|
|
Configuration conf = getConfig();
|
|
@@ -211,7 +214,7 @@ public class TimelineAggregatorsCollection extends CompositeService {
|
|
|
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
|
|
|
this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
|
|
|
NetUtils.createSocketAddr(bindAddress));
|
|
|
- LOG.info("Instantiating the per-node aggregator webapp at " +
|
|
|
+ LOG.info("Instantiating the per-node collector webapp at " +
|
|
|
timelineRestServerBindAddress);
|
|
|
try {
|
|
|
Configuration confForInfoServer = new Configuration(conf);
|
|
@@ -232,40 +235,44 @@ public class TimelineAggregatorsCollection extends CompositeService {
|
|
|
options, new String[] {"/*"});
|
|
|
|
|
|
timelineRestServer.addJerseyResourcePackage(
|
|
|
- TimelineAggregatorWebService.class.getPackage().getName() + ";"
|
|
|
+ TimelineCollectorWebService.class.getPackage().getName() + ";"
|
|
|
+ GenericExceptionHandler.class.getPackage().getName() + ";"
|
|
|
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
|
|
|
"/*");
|
|
|
- timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
|
|
|
- TimelineAggregatorsCollection.getInstance());
|
|
|
+ timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY,
|
|
|
+ TimelineCollectorManager.getInstance());
|
|
|
timelineRestServer.start();
|
|
|
} catch (Exception e) {
|
|
|
- String msg = "The per-node aggregator webapp failed to start.";
|
|
|
+ String msg = "The per-node collector webapp failed to start.";
|
|
|
LOG.error(msg, e);
|
|
|
throw new YarnRuntimeException(msg, e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private void reportNewAggregatorToNM(ApplicationId appId)
|
|
|
+
|
|
|
+ private void reportNewCollectorToNM(ApplicationId appId)
|
|
|
throws YarnException, IOException {
|
|
|
- this.nmAggregatorService = getNMAggregatorService();
|
|
|
- ReportNewAggregatorsInfoRequest request =
|
|
|
- ReportNewAggregatorsInfoRequest.newInstance(appId,
|
|
|
+ this.nmCollectorService = getNMCollectorService();
|
|
|
+ ReportNewCollectorInfoRequest request =
|
|
|
+ ReportNewCollectorInfoRequest.newInstance(appId,
|
|
|
this.timelineRestServerBindAddress);
|
|
|
- LOG.info("Report a new aggregator for application: " + appId +
|
|
|
- " to NM Aggregator Services.");
|
|
|
- nmAggregatorService.reportNewAggregatorInfo(request);
|
|
|
+ LOG.info("Report a new collector for application: " + appId +
|
|
|
+ " to the NM Collector Service.");
|
|
|
+ nmCollectorService.reportNewCollectorInfo(request);
|
|
|
}
|
|
|
-
|
|
|
- // protected for test
|
|
|
- protected AggregatorNodemanagerProtocol getNMAggregatorService(){
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ protected CollectorNodemanagerProtocol getNMCollectorService() {
|
|
|
Configuration conf = getConfig();
|
|
|
final YarnRPC rpc = YarnRPC.create(conf);
|
|
|
-
|
|
|
+
|
|
|
// TODO Security settings.
|
|
|
- return (AggregatorNodemanagerProtocol) rpc.getProxy(
|
|
|
- AggregatorNodemanagerProtocol.class,
|
|
|
- nmAggregatorServiceAddress, conf);
|
|
|
+ return (CollectorNodemanagerProtocol) rpc.getProxy(
|
|
|
+ CollectorNodemanagerProtocol.class,
|
|
|
+ nmCollectorServiceAddress, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public String getRestServerBindAddress() {
|
|
|
+ return timelineRestServerBindAddress;
|
|
|
}
|
|
|
-
|
|
|
}
|