|
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.service.CompositeService;
|
|
|
+import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
|
@@ -71,6 +72,9 @@ public abstract class TimelineCollector extends CompositeService {
|
|
|
|
|
|
private volatile boolean isStopped = false;
|
|
|
|
|
|
+ private int maxWriteRetries;
|
|
|
+ private long writeRetryInterval;
|
|
|
+
|
|
|
public TimelineCollector(String name) {
|
|
|
super(name);
|
|
|
}
|
|
@@ -86,6 +90,13 @@ public abstract class TimelineCollector extends CompositeService {
|
|
|
new ArrayBlockingQueue<>(capacity));
|
|
|
pool.setRejectedExecutionHandler(
|
|
|
new ThreadPoolExecutor.DiscardOldestPolicy());
|
|
|
+
|
|
|
+ maxWriteRetries =
|
|
|
+ conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
|
|
+ writeRetryInterval = conf.getLong(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -153,18 +164,54 @@ public abstract class TimelineCollector extends CompositeService {
|
|
|
UserGroupInformation callerUgi) throws IOException {
|
|
|
LOG.debug("putEntities(entities={}, callerUgi={})", entities, callerUgi);
|
|
|
|
|
|
- TimelineWriteResponse response;
|
|
|
- // synchronize on the writer object so that no other threads can
|
|
|
- // flush the writer buffer concurrently and swallow any exception
|
|
|
- // caused by the timeline enitites that are being put here.
|
|
|
- synchronized (writer) {
|
|
|
- response = writeTimelineEntities(entities, callerUgi);
|
|
|
- flushBufferedTimelineEntities();
|
|
|
+ TimelineWriteResponse response = null;
|
|
|
+ try {
|
|
|
+ boolean isStorageUp = checkRetryWithSleep();
|
|
|
+ if (isStorageUp) {
|
|
|
+ // synchronize on the writer object so that no other threads can
|
|
|
+ // flush the writer buffer concurrently and swallow any exception
|
|
|
+ // caused by the timeline enitites that are being put here.
|
|
|
+ synchronized (writer) {
|
|
|
+ response = writeTimelineEntities(entities, callerUgi);
|
|
|
+ flushBufferedTimelineEntities();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ String msg = String.format("Failed to putEntities(" +
|
|
|
+ "entities=%s, callerUgi=%s) as Timeline Storage is Down",
|
|
|
+ entities, callerUgi);
|
|
|
+ throw new IOException(msg);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ String msg = String.format("Interrupted while retrying to putEntities(" +
|
|
|
+ "entities=%s, callerUgi=%s)", entities, callerUgi);
|
|
|
+ throw new IOException(msg);
|
|
|
}
|
|
|
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private boolean checkRetryWithSleep() throws InterruptedException {
|
|
|
+ int retries = maxWriteRetries;
|
|
|
+ while (retries > 0) {
|
|
|
+ TimelineHealth timelineHealth = writer.getHealthStatus();
|
|
|
+ if (timelineHealth.getHealthStatus().equals(
|
|
|
+ TimelineHealth.TimelineHealthStatus.RUNNING)) {
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ Thread.sleep(writeRetryInterval);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ retries--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Add or update an domain. If the domain already exists, only the owner
|
|
|
* and the admin can update it.
|
|
@@ -179,11 +226,25 @@ public abstract class TimelineCollector extends CompositeService {
|
|
|
UserGroupInformation callerUgi) throws IOException {
|
|
|
LOG.debug("putDomain(domain={}, callerUgi={})", domain, callerUgi);
|
|
|
|
|
|
- TimelineWriteResponse response;
|
|
|
- synchronized (writer) {
|
|
|
- final TimelineCollectorContext context = getTimelineEntityContext();
|
|
|
- response = writer.write(context, domain);
|
|
|
- flushBufferedTimelineEntities();
|
|
|
+ TimelineWriteResponse response = null;
|
|
|
+ try {
|
|
|
+ boolean isStorageUp = checkRetryWithSleep();
|
|
|
+ if (isStorageUp) {
|
|
|
+ synchronized (writer) {
|
|
|
+ final TimelineCollectorContext context = getTimelineEntityContext();
|
|
|
+ response = writer.write(context, domain);
|
|
|
+ flushBufferedTimelineEntities();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ String msg = String.format("Failed to putDomain(" +
|
|
|
+ "domain=%s, callerUgi=%s) as Timeline Storage is Down",
|
|
|
+ domain, callerUgi);
|
|
|
+ throw new IOException(msg);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ String msg = String.format("Interrupted while retrying to putDomain(" +
|
|
|
+ "domain=%s, callerUgi=%s)", domain, callerUgi);
|
|
|
+ throw new IOException(msg);
|
|
|
}
|
|
|
|
|
|
return response;
|