|
@@ -31,10 +31,21 @@ import java.net.URL;
|
|
|
import java.net.URLConnection;
|
|
|
import java.security.GeneralSecurityException;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.FutureTask;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import javax.net.ssl.HostnameVerifier;
|
|
|
import javax.net.ssl.HttpsURLConnection;
|
|
|
import javax.net.ssl.SSLSocketFactory;
|
|
|
+import javax.ws.rs.core.MediaType;
|
|
|
+import javax.ws.rs.core.MultivaluedMap;
|
|
|
+
|
|
|
import org.apache.commons.cli.CommandLine;
|
|
|
import org.apache.commons.cli.GnuParser;
|
|
|
import org.apache.commons.cli.HelpFormatter;
|
|
@@ -44,8 +55,8 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|
|
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
|
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
|
@@ -55,6 +66,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica
|
|
|
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
|
|
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
@@ -67,6 +79,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
+
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Joiner;
|
|
|
import com.google.common.base.Preconditions;
|
|
@@ -79,6 +92,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
|
|
|
import com.sun.jersey.api.client.filter.ClientFilter;
|
|
|
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
|
|
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
|
|
+import com.sun.jersey.core.util.MultivaluedMapImpl;
|
|
|
|
|
|
@Private
|
|
|
@Evolving
|
|
@@ -87,6 +101,8 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
|
|
|
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
|
|
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
|
|
+ private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
|
|
|
+ private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
|
|
private static final Joiner JOINER = Joiner.on("");
|
|
|
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
|
|
|
|
@@ -107,7 +123,6 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
private ConnectionConfigurator connConfigurator;
|
|
|
private DelegationTokenAuthenticator authenticator;
|
|
|
private DelegationTokenAuthenticatedURL.Token token;
|
|
|
- private URI resURI;
|
|
|
private UserGroupInformation authUgi;
|
|
|
private String doAsUser;
|
|
|
private Configuration configuration;
|
|
@@ -115,10 +130,20 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
private TimelineWriter timelineWriter;
|
|
|
private SSLFactory sslFactory;
|
|
|
|
|
|
+ private volatile String timelineServiceAddress;
|
|
|
+
|
|
|
+ // Retry parameters for identifying new timeline service
|
|
|
+ // TODO consider to merge with connection retry
|
|
|
+ private int maxServiceRetries;
|
|
|
+ private long serviceRetryInterval;
|
|
|
+ private boolean timelineServiceV2 = false;
|
|
|
+
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
TimelineClientConnectionRetry connectionRetry;
|
|
|
|
|
|
+ private TimelineEntityDispatcher entityDispatcher;
|
|
|
+
|
|
|
// Abstract class for an operation that should be retried by timeline client
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
@@ -258,7 +283,12 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
}
|
|
|
|
|
|
public TimelineClientImpl() {
|
|
|
- super(TimelineClientImpl.class.getName());
|
|
|
+ super(TimelineClientImpl.class.getName(), null);
|
|
|
+ }
|
|
|
+
|
|
|
+ public TimelineClientImpl(ApplicationId applicationId) {
|
|
|
+ super(TimelineClientImpl.class.getName(), applicationId);
|
|
|
+ this.timelineServiceV2 = true;
|
|
|
}
|
|
|
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
@@ -287,31 +317,47 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
client = new Client(new URLConnectionClientHandler(
|
|
|
new TimelineURLConnectionFactory()), cc);
|
|
|
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
|
|
|
- client.addFilter(retryFilter);
|
|
|
-
|
|
|
- if (YarnConfiguration.useHttps(conf)) {
|
|
|
- resURI = URI
|
|
|
- .create(JOINER.join("https://", conf.get(
|
|
|
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
|
|
|
- RESOURCE_URI_STR));
|
|
|
+ // TODO need to cleanup filter retry later.
|
|
|
+ if (!timelineServiceV2) {
|
|
|
+ client.addFilter(retryFilter);
|
|
|
+ }
|
|
|
+
|
|
|
+ // old version timeline service need to get address from configuration
|
|
|
+ // while new version need to auto discovery (with retry).
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ maxServiceRetries = conf.getInt(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
|
|
+ serviceRetryInterval = conf.getLong(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
|
|
+ entityDispatcher = new TimelineEntityDispatcher(conf);
|
|
|
} else {
|
|
|
- resURI = URI.create(JOINER.join("http://", conf.get(
|
|
|
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
|
|
|
- RESOURCE_URI_STR));
|
|
|
- }
|
|
|
- LOG.info("Timeline service address: " + resURI);
|
|
|
- timelineServiceVersion =
|
|
|
- conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
|
|
|
+ if (YarnConfiguration.useHttps(conf)) {
|
|
|
+ setTimelineServiceAddress(conf.get(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
|
|
|
+ } else {
|
|
|
+ setTimelineServiceAddress(conf.get(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
|
|
|
+ }
|
|
|
+ timelineServiceVersion =
|
|
|
+ conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
|
|
|
+ LOG.info("Timeline service address: " + getTimelineServiceAddress());
|
|
|
+ }
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
- timelineWriter = createTimelineWriter(
|
|
|
- configuration, authUgi, client, resURI);
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ entityDispatcher.start();
|
|
|
+ } else {
|
|
|
+ timelineWriter = createTimelineWriter(configuration, authUgi, client,
|
|
|
+ constructResURI(getConfig(), timelineServiceAddress, false));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected TimelineWriter createTimelineWriter(Configuration conf,
|
|
@@ -333,6 +379,9 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
if (this.sslFactory != null) {
|
|
|
this.sslFactory.destroy();
|
|
|
}
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ entityDispatcher.stop();
|
|
|
+ }
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
@@ -349,6 +398,25 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
return timelineWriter.putEntities(entities);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void putEntities(
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
|
|
+ entities) throws IOException, YarnException {
|
|
|
+ if (!timelineServiceV2) {
|
|
|
+ throw new YarnException("v.2 method is invoked on a v.1.x client");
|
|
|
+ }
|
|
|
+ entityDispatcher.dispatchEntities(true, entities);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void putEntitiesAsync(
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
|
|
+ entities) throws IOException, YarnException {
|
|
|
+ if (!timelineServiceV2) {
|
|
|
+ throw new YarnException("v.2 method is invoked on a v.1.x client");
|
|
|
+ }
|
|
|
+ entityDispatcher.dispatchEntities(false, entities);
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public void putDomain(TimelineDomain domain) throws IOException,
|
|
@@ -356,11 +424,110 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
timelineWriter.putDomain(domain);
|
|
|
}
|
|
|
|
|
|
+ // Used for new timeline service only
|
|
|
+ @Private
|
|
|
+ protected void putObjects(String path, MultivaluedMap<String, String> params,
|
|
|
+ Object obj) throws IOException, YarnException {
|
|
|
+
|
|
|
+ int retries = verifyRestEndPointAvailable();
|
|
|
+
|
|
|
+ // timelineServiceAddress could be stale, add retry logic here.
|
|
|
+ boolean needRetry = true;
|
|
|
+ while (needRetry) {
|
|
|
+ try {
|
|
|
+ URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
|
|
|
+ putObjects(uri, path, params, obj);
|
|
|
+ needRetry = false;
|
|
|
+ } catch (IOException e) {
|
|
|
+ // handle exception for timelineServiceAddress being updated.
|
|
|
+ checkRetryWithSleep(retries, e);
|
|
|
+ retries--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int verifyRestEndPointAvailable() throws YarnException {
|
|
|
+ // timelineServiceAddress could haven't be initialized yet
|
|
|
+ // or stale (only for new timeline service)
|
|
|
+ int retries = pollTimelineServiceAddress(this.maxServiceRetries);
|
|
|
+ if (timelineServiceAddress == null) {
|
|
|
+ String errMessage = "TimelineClient has reached to max retry times : "
|
|
|
+ + this.maxServiceRetries
|
|
|
+ + ", but failed to fetch timeline service address. Please verify"
|
|
|
+ + " Timeline Auxillary Service is configured in all the NMs";
|
|
|
+ LOG.error(errMessage);
|
|
|
+ throw new YarnException(errMessage);
|
|
|
+ }
|
|
|
+ return retries;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if reaching to maximum of retries.
|
|
|
+ * @param retries
|
|
|
+ * @param e
|
|
|
+ */
|
|
|
+ private void checkRetryWithSleep(int retries, IOException e)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ if (retries > 0) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(this.serviceRetryInterval);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new YarnException("Interrupted while retrying to connect to ATS");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ StringBuilder msg =
|
|
|
+ new StringBuilder("TimelineClient has reached to max retry times : ");
|
|
|
+ msg.append(this.maxServiceRetries);
|
|
|
+ msg.append(" for service address: ");
|
|
|
+ msg.append(timelineServiceAddress);
|
|
|
+ LOG.error(msg.toString());
|
|
|
+ throw new IOException(msg.toString(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void putObjects(
|
|
|
+ URI base, String path, MultivaluedMap<String, String> params, Object obj)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ ClientResponse resp;
|
|
|
+ try {
|
|
|
+ resp = client.resource(base).path(path).queryParams(params)
|
|
|
+ .accept(MediaType.APPLICATION_JSON)
|
|
|
+ .type(MediaType.APPLICATION_JSON)
|
|
|
+ .put(ClientResponse.class, obj);
|
|
|
+ } catch (RuntimeException re) {
|
|
|
+ // runtime exception is expected if the client cannot connect the server
|
|
|
+ String msg =
|
|
|
+ "Failed to get the response from the timeline server.";
|
|
|
+ LOG.error(msg, re);
|
|
|
+ throw new IOException(re);
|
|
|
+ }
|
|
|
+ if (resp == null ||
|
|
|
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
|
|
+ String msg = "Response from the timeline server is " +
|
|
|
+ ((resp == null) ? "null":
|
|
|
+ "not successful," + " HTTP error code: " + resp.getStatus()
|
|
|
+ + ", Server response:\n" + resp.getEntity(String.class));
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new YarnException(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setTimelineServiceAddress(String address) {
|
|
|
+ this.timelineServiceAddress = address;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getTimelineServiceAddress() {
|
|
|
+ return this.timelineServiceAddress;
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
|
|
|
final String renewer) throws IOException, YarnException {
|
|
|
- PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction =
|
|
|
+ PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>
|
|
|
+ getDTAction =
|
|
|
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
|
|
|
|
|
@Override
|
|
@@ -369,8 +536,12 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
DelegationTokenAuthenticatedURL authUrl =
|
|
|
new DelegationTokenAuthenticatedURL(authenticator,
|
|
|
connConfigurator);
|
|
|
+ // TODO we should add retry logic here if timelineServiceAddress is
|
|
|
+ // not available immediately.
|
|
|
return (Token) authUrl.getDelegationToken(
|
|
|
- resURI.toURL(), token, renewer, doAsUser);
|
|
|
+ constructResURI(getConfig(),
|
|
|
+ getTimelineServiceAddress(), false).toURL(),
|
|
|
+ token, renewer, doAsUser);
|
|
|
}
|
|
|
};
|
|
|
return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
|
|
@@ -393,8 +564,8 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
@Override
|
|
|
public Long run() throws Exception {
|
|
|
// If the timeline DT to renew is different than cached, replace it.
|
|
|
- // Token to set every time for retry, because when exception happens,
|
|
|
- // DelegationTokenAuthenticatedURL will reset it to null;
|
|
|
+ // Token to set every time for retry, because when exception
|
|
|
+ // happens, DelegationTokenAuthenticatedURL will reset it to null;
|
|
|
if (!timelineDT.equals(token.getDelegationToken())) {
|
|
|
token.setDelegationToken((Token) timelineDT);
|
|
|
}
|
|
@@ -403,9 +574,10 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
connConfigurator);
|
|
|
// If the token service address is not available, fall back to use
|
|
|
// the configured service address.
|
|
|
- final URI serviceURI = isTokenServiceAddrEmpty ? resURI
|
|
|
+ final URI serviceURI = isTokenServiceAddrEmpty ?
|
|
|
+ constructResURI(getConfig(), getTimelineServiceAddress(), false)
|
|
|
: new URI(scheme, null, address.getHostName(),
|
|
|
- address.getPort(), RESOURCE_URI_STR, null, null);
|
|
|
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
|
|
|
return authUrl
|
|
|
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
|
|
|
}
|
|
@@ -429,9 +601,10 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
|
|
|
@Override
|
|
|
public Void run() throws Exception {
|
|
|
- // If the timeline DT to cancel is different than cached, replace it.
|
|
|
- // Token to set every time for retry, because when exception happens,
|
|
|
- // DelegationTokenAuthenticatedURL will reset it to null;
|
|
|
+ // If the timeline DT to cancel is different than cached, replace
|
|
|
+ // it.
|
|
|
+ // Token to set every time for retry, because when exception
|
|
|
+ // happens, DelegationTokenAuthenticatedURL will reset it to null;
|
|
|
if (!timelineDT.equals(token.getDelegationToken())) {
|
|
|
token.setDelegationToken((Token) timelineDT);
|
|
|
}
|
|
@@ -440,9 +613,10 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
connConfigurator);
|
|
|
// If the token service address is not available, fall back to use
|
|
|
// the configured service address.
|
|
|
- final URI serviceURI = isTokenServiceAddrEmpty ? resURI
|
|
|
+ final URI serviceURI = isTokenServiceAddrEmpty ?
|
|
|
+ constructResURI(getConfig(), getTimelineServiceAddress(), false)
|
|
|
: new URI(scheme, null, address.getHostName(),
|
|
|
- address.getPort(), RESOURCE_URI_STR, null, null);
|
|
|
+ address.getPort(), RESOURCE_URI_STR_V1, null, null);
|
|
|
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
|
|
|
return null;
|
|
|
}
|
|
@@ -452,7 +626,8 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return super.toString() + " with timeline server " + resURI
|
|
|
+ return super.toString() + " with timeline server "
|
|
|
+ + constructResURI(getConfig(), getTimelineServiceAddress(), false)
|
|
|
+ " and writer " + timelineWriter;
|
|
|
}
|
|
|
|
|
@@ -466,6 +641,26 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
return connectionRetry.retryOn(tokenRetryOp);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Poll TimelineServiceAddress for maximum of retries times if it is null.
|
|
|
+ *
|
|
|
+ * @param retries
|
|
|
+ * @return the left retry times
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private int pollTimelineServiceAddress(int retries) throws YarnException {
|
|
|
+ while (timelineServiceAddress == null && retries > 0) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(this.serviceRetryInterval);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new YarnException("Interrupted while trying to connect ATS");
|
|
|
+ }
|
|
|
+ retries--;
|
|
|
+ }
|
|
|
+ return retries;
|
|
|
+ }
|
|
|
+
|
|
|
private class TimelineURLConnectionFactory
|
|
|
implements HttpURLConnectionFactory {
|
|
|
|
|
@@ -535,6 +730,13 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
connection.setReadTimeout(socketTimeout);
|
|
|
}
|
|
|
|
|
|
+ private static URI constructResURI(
|
|
|
+ Configuration conf, String address, boolean v2) {
|
|
|
+ return URI.create(
|
|
|
+ JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
|
|
|
+ address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] argv) throws Exception {
|
|
|
CommandLine cliParser = new GnuParser().parse(opts, argv);
|
|
|
if (cliParser.hasOption("put")) {
|
|
@@ -716,4 +918,220 @@ public class TimelineClientImpl extends TimelineClient {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private final class EntitiesHolder extends FutureTask<Void> {
|
|
|
+ private final
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
|
|
+ entities;
|
|
|
+ private final boolean isSync;
|
|
|
+
|
|
|
+ EntitiesHolder(
|
|
|
+ final
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
|
|
+ entities,
|
|
|
+ final boolean isSync) {
|
|
|
+ super(new Callable<Void>() {
|
|
|
+ // publishEntities()
|
|
|
+ public Void call() throws Exception {
|
|
|
+ MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
|
|
+ params.add("appid", getContextAppId().toString());
|
|
|
+ params.add("async", Boolean.toString(!isSync));
|
|
|
+ putObjects("entities", params, entities);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ this.entities = entities;
|
|
|
+ this.isSync = isSync;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isSync() {
|
|
|
+ return isSync;
|
|
|
+ }
|
|
|
+
|
|
|
+ public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
|
|
+ getEntities() {
|
|
|
+ return entities;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class is responsible for collecting the timeline entities and
|
|
|
+ * publishing them in async.
|
|
|
+ */
|
|
|
+ private class TimelineEntityDispatcher {
|
|
|
+ /**
|
|
|
+ * Time period for which the timelineclient will wait for draining after
|
|
|
+ * stop.
|
|
|
+ */
|
|
|
+ private static final long DRAIN_TIME_PERIOD = 2000L;
|
|
|
+
|
|
|
+ private int numberOfAsyncsToMerge;
|
|
|
+ private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
|
|
|
+ private ExecutorService executor;
|
|
|
+
|
|
|
+ TimelineEntityDispatcher(Configuration conf) {
|
|
|
+ timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
|
|
|
+ numberOfAsyncsToMerge =
|
|
|
+ conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
|
|
|
+ YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
|
|
|
+ }
|
|
|
+
|
|
|
+ Runnable createRunnable() {
|
|
|
+ return new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ EntitiesHolder entitiesHolder;
|
|
|
+ while (!Thread.currentThread().isInterrupted()) {
|
|
|
+ // Merge all the async calls and make one push, but if its sync
|
|
|
+ // call push immediately
|
|
|
+ try {
|
|
|
+ entitiesHolder = timelineEntityQueue.take();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.info("Timeline dispatcher thread was interrupted ");
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (entitiesHolder != null) {
|
|
|
+ publishWithoutBlockingOnQueue(entitiesHolder);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (!timelineEntityQueue.isEmpty()) {
|
|
|
+ LOG.info("Yet to publish " + timelineEntityQueue.size()
|
|
|
+ + " timelineEntities, draining them now. ");
|
|
|
+ }
|
|
|
+ // Try to drain the remaining entities to be published @ the max for
|
|
|
+ // 2 seconds
|
|
|
+ long timeTillweDrain =
|
|
|
+ System.currentTimeMillis() + DRAIN_TIME_PERIOD;
|
|
|
+ while (!timelineEntityQueue.isEmpty()) {
|
|
|
+ publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
|
|
|
+ if (System.currentTimeMillis() > timeTillweDrain) {
|
|
|
+ // time elapsed stop publishing further....
|
|
|
+ if (!timelineEntityQueue.isEmpty()) {
|
|
|
+ LOG.warn("Time to drain elapsed! Remaining "
|
|
|
+ + timelineEntityQueue.size() + "timelineEntities will not"
|
|
|
+ + " be published");
|
|
|
+ // if some entities were not drained then we need interrupt
|
|
|
+ // the threads which had put sync EntityHolders to the queue.
|
|
|
+ EntitiesHolder nextEntityInTheQueue = null;
|
|
|
+ while ((nextEntityInTheQueue =
|
|
|
+ timelineEntityQueue.poll()) != null) {
|
|
|
+ nextEntityInTheQueue.cancel(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Publishes the given EntitiesHolder and return immediately if sync
|
|
|
+ * call, else tries to fetch the EntitiesHolder from the queue in non
|
|
|
+ * blocking fashion and collate the Entities if possible before
|
|
|
+ * publishing through REST.
|
|
|
+ *
|
|
|
+ * @param entitiesHolder
|
|
|
+ */
|
|
|
+ private void publishWithoutBlockingOnQueue(
|
|
|
+ EntitiesHolder entitiesHolder) {
|
|
|
+ if (entitiesHolder.isSync()) {
|
|
|
+ entitiesHolder.run();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int count = 1;
|
|
|
+ while (true) {
|
|
|
+ // loop till we find a sync put Entities or there is nothing
|
|
|
+ // to take
|
|
|
+ EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
|
|
|
+ if (nextEntityInTheQueue == null) {
|
|
|
+ // Nothing in the queue just publish and get back to the
|
|
|
+ // blocked wait state
|
|
|
+ entitiesHolder.run();
|
|
|
+ break;
|
|
|
+ } else if (nextEntityInTheQueue.isSync()) {
|
|
|
+ // flush all the prev async entities first
|
|
|
+ entitiesHolder.run();
|
|
|
+ // and then flush the sync entity
|
|
|
+ nextEntityInTheQueue.run();
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ // append all async entities together and then flush
|
|
|
+ entitiesHolder.getEntities().addEntities(
|
|
|
+ nextEntityInTheQueue.getEntities().getEntities());
|
|
|
+ count++;
|
|
|
+ if (count == numberOfAsyncsToMerge) {
|
|
|
+ // Flush the entities if the number of the async
|
|
|
+ // putEntites merged reaches the desired limit. To avoid
|
|
|
+ // collecting multiple entities and delaying for a long
|
|
|
+ // time.
|
|
|
+ entitiesHolder.run();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ public void dispatchEntities(boolean sync,
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
|
|
|
+ entitiesTobePublished) throws YarnException {
|
|
|
+ if (executor.isShutdown()) {
|
|
|
+ throw new YarnException("Timeline client is in the process of stopping,"
|
|
|
+ + " not accepting any more TimelineEntities");
|
|
|
+ }
|
|
|
+
|
|
|
+ // wrap all TimelineEntity into TimelineEntities object
|
|
|
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
|
|
+ entities =
|
|
|
+ new org.apache.hadoop.yarn.api.records.timelineservice.
|
|
|
+ TimelineEntities();
|
|
|
+ for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
|
|
+ entity : entitiesTobePublished) {
|
|
|
+ entities.addEntity(entity);
|
|
|
+ }
|
|
|
+
|
|
|
+ // created a holder and place it in queue
|
|
|
+ EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
|
|
|
+ try {
|
|
|
+ timelineEntityQueue.put(entitiesHolder);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new YarnException(
|
|
|
+ "Failed while adding entity to the queue for publishing", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (sync) {
|
|
|
+ // In sync call we need to wait till its published and if any error then
|
|
|
+ // throw it back
|
|
|
+ try {
|
|
|
+ entitiesHolder.get();
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ throw new YarnException("Failed while publishing entity",
|
|
|
+ e.getCause());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new YarnException("Interrupted while publishing entity", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void start() {
|
|
|
+ executor = Executors.newSingleThreadExecutor();
|
|
|
+ executor.execute(createRunnable());
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ LOG.info("Stopping TimelineClient.");
|
|
|
+ executor.shutdownNow();
|
|
|
+ try {
|
|
|
+ executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|