|
@@ -69,6 +69,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
|
|
|
|
|
private TimelineEntityDispatcher entityDispatcher;
|
|
|
+ private TimelineEntityDispatcher subAppEntityDispatcher;
|
|
|
private volatile String timelineServiceAddress;
|
|
|
@VisibleForTesting
|
|
|
volatile Token currentTimelineToken = null;
|
|
@@ -124,6 +125,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
|
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
|
|
entityDispatcher = new TimelineEntityDispatcher(conf);
|
|
|
+ subAppEntityDispatcher = new TimelineEntityDispatcher(conf);
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
@@ -131,24 +133,38 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
protected void serviceStart() throws Exception {
|
|
|
super.serviceStart();
|
|
|
entityDispatcher.start();
|
|
|
+ subAppEntityDispatcher.start();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
|
entityDispatcher.stop();
|
|
|
+ subAppEntityDispatcher.stop();
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void putEntities(TimelineEntity... entities)
|
|
|
throws IOException, YarnException {
|
|
|
- entityDispatcher.dispatchEntities(true, entities);
|
|
|
+ entityDispatcher.dispatchEntities(true, entities, false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void putEntitiesAsync(TimelineEntity... entities)
|
|
|
throws IOException, YarnException {
|
|
|
- entityDispatcher.dispatchEntities(false, entities);
|
|
|
+ entityDispatcher.dispatchEntities(false, entities, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void putSubAppEntities(TimelineEntity... entities)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ subAppEntityDispatcher.dispatchEntities(true, entities, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void putSubAppEntitiesAsync(TimelineEntity... entities)
|
|
|
+ throws IOException, YarnException {
|
|
|
+ subAppEntityDispatcher.dispatchEntities(false, entities, true);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -346,13 +362,15 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
private final TimelineEntities entities;
|
|
|
private final boolean isSync;
|
|
|
|
|
|
- EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
|
|
|
+ EntitiesHolder(final TimelineEntities entities, final boolean isSync,
|
|
|
+ final boolean subappwrite) {
|
|
|
super(new Callable<Void>() {
|
|
|
// publishEntities()
|
|
|
public Void call() throws Exception {
|
|
|
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
|
|
params.add("appid", getContextAppId().toString());
|
|
|
params.add("async", Boolean.toString(!isSync));
|
|
|
+ params.add("subappwrite", Boolean.toString(subappwrite));
|
|
|
putObjects("entities", params, entities);
|
|
|
return null;
|
|
|
}
|
|
@@ -496,7 +514,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
}
|
|
|
|
|
|
public void dispatchEntities(boolean sync,
|
|
|
- TimelineEntity[] entitiesTobePublished) throws YarnException {
|
|
|
+ TimelineEntity[] entitiesTobePublished, boolean subappwrite)
|
|
|
+ throws YarnException {
|
|
|
if (executor.isShutdown()) {
|
|
|
throw new YarnException("Timeline client is in the process of stopping,"
|
|
|
+ " not accepting any more TimelineEntities");
|
|
@@ -509,7 +528,8 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
}
|
|
|
|
|
|
// created a holder and place it in queue
|
|
|
- EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
|
|
|
+ EntitiesHolder entitiesHolder =
|
|
|
+ new EntitiesHolder(entities, sync, subappwrite);
|
|
|
try {
|
|
|
timelineEntityQueue.put(entitiesHolder);
|
|
|
} catch (InterruptedException e) {
|