|
@@ -24,11 +24,15 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
+import org.apache.hadoop.yarn.client.AHSProxy;
|
|
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
@@ -41,38 +45,73 @@ public class AppReportFetcher {
|
|
|
private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
|
|
|
private final Configuration conf;
|
|
|
private final ApplicationClientProtocol applicationsManager;
|
|
|
+ private final ApplicationHistoryProtocol historyManager;
|
|
|
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
-
|
|
|
+ private boolean isAHSEnabled;
|
|
|
+
|
|
|
/**
|
|
|
- * Create a new Connection to the RM to fetch Application reports.
|
|
|
+ * Create a new Connection to the RM/Application History Server
|
|
|
+ * to fetch Application reports.
|
|
|
* @param conf the conf to use to know where the RM is.
|
|
|
*/
|
|
|
public AppReportFetcher(Configuration conf) {
|
|
|
+ if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
|
|
|
+ isAHSEnabled = true;
|
|
|
+ }
|
|
|
this.conf = conf;
|
|
|
try {
|
|
|
applicationsManager = ClientRMProxy.createRMProxy(conf,
|
|
|
ApplicationClientProtocol.class);
|
|
|
+ if (isAHSEnabled) {
|
|
|
+ historyManager = getAHSProxy(conf);
|
|
|
+ } else {
|
|
|
+ this.historyManager = null;
|
|
|
+ }
|
|
|
} catch (IOException e) {
|
|
|
throw new YarnRuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Just call directly into the applicationsManager given instead of creating
|
|
|
- * a remote connection to it. This is mostly for when the Proxy is running
|
|
|
- * as part of the RM already.
|
|
|
+ * Create a direct connection to RM instead of a remote connection when
|
|
|
+ * the proxy is running as part of the RM. Also create a remote connection to
|
|
|
+ * Application History Server if it is enabled.
|
|
|
* @param conf the configuration to use
|
|
|
* @param applicationsManager what to use to get the RM reports.
|
|
|
*/
|
|
|
public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
|
|
|
+ if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
|
|
|
+ isAHSEnabled = true;
|
|
|
+ }
|
|
|
this.conf = conf;
|
|
|
this.applicationsManager = applicationsManager;
|
|
|
+ if (isAHSEnabled) {
|
|
|
+ try {
|
|
|
+ historyManager = getAHSProxy(conf);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new YarnRuntimeException(e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.historyManager = null;
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
|
|
|
+ throws IOException {
|
|
|
+ return AHSProxy.createAHSProxy(configuration,
|
|
|
+ ApplicationHistoryProtocol.class,
|
|
|
+ configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Get a report for the specified app.
|
|
|
- * @param appId the id of the application to get.
|
|
|
- * @return the ApplicationReport for that app.
|
|
|
+ * Get an application report for the specified application id from the RM and
|
|
|
+ * fall back to the Application History Server if not found in RM.
|
|
|
+ * @param appId id of the application to get.
|
|
|
+ * @return the ApplicationReport for the appId.
|
|
|
* @throws YarnException on any error.
|
|
|
* @throws IOException
|
|
|
*/
|
|
@@ -81,9 +120,22 @@ public class AppReportFetcher {
|
|
|
GetApplicationReportRequest request = recordFactory
|
|
|
.newRecordInstance(GetApplicationReportRequest.class);
|
|
|
request.setApplicationId(appId);
|
|
|
-
|
|
|
- GetApplicationReportResponse response = applicationsManager
|
|
|
- .getApplicationReport(request);
|
|
|
+
|
|
|
+ GetApplicationReportResponse response;
|
|
|
+ try {
|
|
|
+ response = applicationsManager.getApplicationReport(request);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ if (!isAHSEnabled) {
|
|
|
+ // Just throw it as usual if historyService is not enabled.
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ // Even if history-service is enabled, treat all exceptions still the same
|
|
|
+ // except the following
|
|
|
+ if (!(e.getClass() == ApplicationNotFoundException.class)) {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ response = historyManager.getApplicationReport(request);
|
|
|
+ }
|
|
|
return response.getApplicationReport();
|
|
|
}
|
|
|
|
|
@@ -91,5 +143,8 @@ public class AppReportFetcher {
|
|
|
if (this.applicationsManager != null) {
|
|
|
RPC.stopProxy(this.applicationsManager);
|
|
|
}
|
|
|
+ if (this.historyManager != null) {
|
|
|
+ RPC.stopProxy(this.historyManager);
|
|
|
+ }
|
|
|
}
|
|
|
}
|