|
@@ -25,6 +25,7 @@ import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
|
|
+import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.SortedSet;
|
|
import java.util.SortedSet;
|
|
@@ -52,17 +53,21 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
|
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.security.TimelineACLsManager;
|
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
|
|
|
+import org.apache.hadoop.yarn.webapp.NotFoundException;
|
|
|
|
|
|
import com.google.inject.Inject;
|
|
import com.google.inject.Inject;
|
|
import com.google.inject.Singleton;
|
|
import com.google.inject.Singleton;
|
|
@@ -75,10 +80,13 @@ public class TimelineWebServices {
|
|
private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
|
|
private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
|
|
|
|
|
|
private TimelineStore store;
|
|
private TimelineStore store;
|
|
|
|
+ private TimelineACLsManager timelineACLsManager;
|
|
|
|
|
|
@Inject
|
|
@Inject
|
|
- public TimelineWebServices(TimelineStore store) {
|
|
|
|
|
|
+ public TimelineWebServices(TimelineStore store,
|
|
|
|
+ TimelineACLsManager timelineACLsManager) {
|
|
this.store = store;
|
|
this.store = store;
|
|
|
|
+ this.timelineACLsManager = timelineACLsManager;
|
|
}
|
|
}
|
|
|
|
|
|
@XmlRootElement(name = "about")
|
|
@XmlRootElement(name = "about")
|
|
@@ -141,6 +149,9 @@ public class TimelineWebServices {
|
|
init(res);
|
|
init(res);
|
|
TimelineEntities entities = null;
|
|
TimelineEntities entities = null;
|
|
try {
|
|
try {
|
|
|
|
+ EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
|
|
|
|
+ boolean modified = extendFields(fieldEnums);
|
|
|
|
+ UserGroupInformation callerUGI = getUser(req);
|
|
entities = store.getEntities(
|
|
entities = store.getEntities(
|
|
parseStr(entityType),
|
|
parseStr(entityType),
|
|
parseLongStr(limit),
|
|
parseLongStr(limit),
|
|
@@ -150,7 +161,33 @@ public class TimelineWebServices {
|
|
parseLongStr(fromTs),
|
|
parseLongStr(fromTs),
|
|
parsePairStr(primaryFilter, ":"),
|
|
parsePairStr(primaryFilter, ":"),
|
|
parsePairsStr(secondaryFilter, ",", ":"),
|
|
parsePairsStr(secondaryFilter, ",", ":"),
|
|
- parseFieldsStr(fields, ","));
|
|
|
|
|
|
+ fieldEnums);
|
|
|
|
+ if (entities != null) {
|
|
|
|
+ Iterator<TimelineEntity> entitiesItr =
|
|
|
|
+ entities.getEntities().iterator();
|
|
|
|
+ while (entitiesItr.hasNext()) {
|
|
|
|
+ TimelineEntity entity = entitiesItr.next();
|
|
|
|
+ try {
|
|
|
|
+ // check ACLs
|
|
|
|
+ if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
|
|
|
+ entitiesItr.remove();
|
|
|
|
+ } else {
|
|
|
|
+ // clean up system data
|
|
|
|
+ if (modified) {
|
|
|
|
+ entity.setPrimaryFilters(null);
|
|
|
|
+ } else {
|
|
|
|
+ cleanupOwnerInfo(entity);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ LOG.error("Error when verifying access for user " + callerUGI
|
|
|
|
+ + " on the events of the timeline entity "
|
|
|
|
+ + new EntityIdentifier(entity.getEntityId(),
|
|
|
|
+ entity.getEntityType()), e);
|
|
|
|
+ entitiesItr.remove();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
} catch (NumberFormatException e) {
|
|
} catch (NumberFormatException e) {
|
|
throw new BadRequestException(
|
|
throw new BadRequestException(
|
|
"windowStart, windowEnd or limit is not a numeric value.");
|
|
"windowStart, windowEnd or limit is not a numeric value.");
|
|
@@ -182,9 +219,25 @@ public class TimelineWebServices {
|
|
init(res);
|
|
init(res);
|
|
TimelineEntity entity = null;
|
|
TimelineEntity entity = null;
|
|
try {
|
|
try {
|
|
|
|
+ EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
|
|
|
|
+ boolean modified = extendFields(fieldEnums);
|
|
entity =
|
|
entity =
|
|
store.getEntity(parseStr(entityId), parseStr(entityType),
|
|
store.getEntity(parseStr(entityId), parseStr(entityType),
|
|
- parseFieldsStr(fields, ","));
|
|
|
|
|
|
+ fieldEnums);
|
|
|
|
+ if (entity != null) {
|
|
|
|
+ // check ACLs
|
|
|
|
+ UserGroupInformation callerUGI = getUser(req);
|
|
|
|
+ if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
|
|
|
+ entity = null;
|
|
|
|
+ } else {
|
|
|
|
+ // clean up the system data
|
|
|
|
+ if (modified) {
|
|
|
|
+ entity.setPrimaryFilters(null);
|
|
|
|
+ } else {
|
|
|
|
+ cleanupOwnerInfo(entity);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
} catch (IllegalArgumentException e) {
|
|
} catch (IllegalArgumentException e) {
|
|
throw new BadRequestException(
|
|
throw new BadRequestException(
|
|
"requested invalid field.");
|
|
"requested invalid field.");
|
|
@@ -192,9 +245,15 @@ public class TimelineWebServices {
|
|
LOG.error("Error getting entity", e);
|
|
LOG.error("Error getting entity", e);
|
|
throw new WebApplicationException(e,
|
|
throw new WebApplicationException(e,
|
|
Response.Status.INTERNAL_SERVER_ERROR);
|
|
Response.Status.INTERNAL_SERVER_ERROR);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ LOG.error("Error getting entity", e);
|
|
|
|
+ throw new WebApplicationException(e,
|
|
|
|
+ Response.Status.INTERNAL_SERVER_ERROR);
|
|
}
|
|
}
|
|
if (entity == null) {
|
|
if (entity == null) {
|
|
- throw new WebApplicationException(Response.Status.NOT_FOUND);
|
|
|
|
|
|
+ throw new NotFoundException("Timeline entity "
|
|
|
|
+ + new EntityIdentifier(parseStr(entityId), parseStr(entityType))
|
|
|
|
+ + " is not found");
|
|
}
|
|
}
|
|
return entity;
|
|
return entity;
|
|
}
|
|
}
|
|
@@ -217,6 +276,7 @@ public class TimelineWebServices {
|
|
init(res);
|
|
init(res);
|
|
TimelineEvents events = null;
|
|
TimelineEvents events = null;
|
|
try {
|
|
try {
|
|
|
|
+ UserGroupInformation callerUGI = getUser(req);
|
|
events = store.getEntityTimelines(
|
|
events = store.getEntityTimelines(
|
|
parseStr(entityType),
|
|
parseStr(entityType),
|
|
parseArrayStr(entityId, ","),
|
|
parseArrayStr(entityId, ","),
|
|
@@ -224,6 +284,29 @@ public class TimelineWebServices {
|
|
parseLongStr(windowStart),
|
|
parseLongStr(windowStart),
|
|
parseLongStr(windowEnd),
|
|
parseLongStr(windowEnd),
|
|
parseArrayStr(eventType, ","));
|
|
parseArrayStr(eventType, ","));
|
|
|
|
+ if (events != null) {
|
|
|
|
+ Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
|
|
|
|
+ events.getAllEvents().iterator();
|
|
|
|
+ while (eventsItr.hasNext()) {
|
|
|
|
+ TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
|
|
|
|
+ try {
|
|
|
|
+ TimelineEntity entity = store.getEntity(
|
|
|
|
+ eventsOfOneEntity.getEntityId(),
|
|
|
|
+ eventsOfOneEntity.getEntityType(),
|
|
|
|
+ EnumSet.of(Field.PRIMARY_FILTERS));
|
|
|
|
+ // check ACLs
|
|
|
|
+ if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
|
|
|
+ eventsItr.remove();
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.error("Error when verifying access for user " + callerUGI
|
|
|
|
+ + " on the events of the timeline entity "
|
|
|
|
+ + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
|
|
|
|
+ eventsOfOneEntity.getEntityType()), e);
|
|
|
|
+ eventsItr.remove();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
} catch (NumberFormatException e) {
|
|
} catch (NumberFormatException e) {
|
|
throw new BadRequestException(
|
|
throw new BadRequestException(
|
|
"windowStart, windowEnd or limit is not a numeric value.");
|
|
"windowStart, windowEnd or limit is not a numeric value.");
|
|
@@ -252,12 +335,61 @@ public class TimelineWebServices {
|
|
if (entities == null) {
|
|
if (entities == null) {
|
|
return new TimelinePutResponse();
|
|
return new TimelinePutResponse();
|
|
}
|
|
}
|
|
|
|
+ UserGroupInformation callerUGI = getUser(req);
|
|
try {
|
|
try {
|
|
List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
|
|
List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
|
|
|
|
+ TimelineEntities entitiesToPut = new TimelineEntities();
|
|
|
|
+ List<TimelinePutResponse.TimelinePutError> errors =
|
|
|
|
+ new ArrayList<TimelinePutResponse.TimelinePutError>();
|
|
for (TimelineEntity entity : entities.getEntities()) {
|
|
for (TimelineEntity entity : entities.getEntities()) {
|
|
EntityIdentifier entityID =
|
|
EntityIdentifier entityID =
|
|
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
|
|
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
|
|
|
|
+
|
|
|
|
+ // check if there is existing entity
|
|
|
|
+ try {
|
|
|
|
+ TimelineEntity existingEntity =
|
|
|
|
+ store.getEntity(entityID.getId(), entityID.getType(),
|
|
|
|
+ EnumSet.of(Field.PRIMARY_FILTERS));
|
|
|
|
+ if (existingEntity != null
|
|
|
|
+ && !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
|
|
|
|
+ throw new YarnException("The timeline entity " + entityID
|
|
|
|
+ + " was not put by " + callerUGI + " before");
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ // Skip the entity which already exists and was put by others
|
|
|
|
+ LOG.warn("Skip the timeline entity: " + entityID + ", because "
|
|
|
|
+ + e.getMessage());
|
|
|
|
+ TimelinePutResponse.TimelinePutError error =
|
|
|
|
+ new TimelinePutResponse.TimelinePutError();
|
|
|
|
+ error.setEntityId(entityID.getId());
|
|
|
|
+ error.setEntityType(entityID.getType());
|
|
|
|
+ error.setErrorCode(
|
|
|
|
+ TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
|
|
|
|
+ errors.add(error);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // inject owner information for the access check
|
|
|
|
+ try {
|
|
|
|
+ injectOwnerInfo(entity,
|
|
|
|
+ callerUGI == null ? "" : callerUGI.getShortUserName());
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ // Skip the entity which messes up the primary filter and record the
|
|
|
|
+ // error
|
|
|
|
+ LOG.warn("Skip the timeline entity: " + entityID + ", because "
|
|
|
|
+ + e.getMessage());
|
|
|
|
+ TimelinePutResponse.TimelinePutError error =
|
|
|
|
+ new TimelinePutResponse.TimelinePutError();
|
|
|
|
+ error.setEntityId(entityID.getId());
|
|
|
|
+ error.setEntityType(entityID.getType());
|
|
|
|
+ error.setErrorCode(
|
|
|
|
+ TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
|
|
|
|
+ errors.add(error);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
entityIDs.add(entityID);
|
|
entityIDs.add(entityID);
|
|
|
|
+ entitiesToPut.addEntity(entity);
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
|
|
LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
|
|
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
|
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
|
@@ -266,7 +398,10 @@ public class TimelineWebServices {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
|
|
LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
|
|
}
|
|
}
|
|
- return store.put(entities);
|
|
|
|
|
|
+ TimelinePutResponse response = store.put(entitiesToPut);
|
|
|
|
+ // add the errors of timeline system filter key conflict
|
|
|
|
+ response.addErrors(errors);
|
|
|
|
+ return response;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error("Error putting entities", e);
|
|
LOG.error("Error putting entities", e);
|
|
throw new WebApplicationException(e,
|
|
throw new WebApplicationException(e,
|
|
@@ -350,6 +485,14 @@ public class TimelineWebServices {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static boolean extendFields(EnumSet<Field> fieldEnums) {
|
|
|
|
+ boolean modified = false;
|
|
|
|
+ if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
|
|
|
|
+ fieldEnums.add(Field.PRIMARY_FILTERS);
|
|
|
|
+ modified = true;
|
|
|
|
+ }
|
|
|
|
+ return modified;
|
|
|
|
+ }
|
|
private static Long parseLongStr(String str) {
|
|
private static Long parseLongStr(String str) {
|
|
return str == null ? null : Long.parseLong(str.trim());
|
|
return str == null ? null : Long.parseLong(str.trim());
|
|
}
|
|
}
|
|
@@ -358,4 +501,34 @@ public class TimelineWebServices {
|
|
return str == null ? null : str.trim();
|
|
return str == null ? null : str.trim();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static UserGroupInformation getUser(HttpServletRequest req) {
|
|
|
|
+ String remoteUser = req.getRemoteUser();
|
|
|
|
+ UserGroupInformation callerUGI = null;
|
|
|
|
+ if (remoteUser != null) {
|
|
|
|
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
|
|
|
+ }
|
|
|
|
+ return callerUGI;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static void injectOwnerInfo(TimelineEntity timelineEntity,
|
|
|
|
+ String owner) throws YarnException {
|
|
|
|
+ if (timelineEntity.getPrimaryFilters() != null &&
|
|
|
|
+ timelineEntity.getPrimaryFilters().containsKey(
|
|
|
|
+ TimelineStore.SystemFilter.ENTITY_OWNER)) {
|
|
|
|
+ throw new YarnException(
|
|
|
|
+ "User should not use the timeline system filter key: "
|
|
|
|
+ + TimelineStore.SystemFilter.ENTITY_OWNER);
|
|
|
|
+ }
|
|
|
|
+ timelineEntity.addPrimaryFilter(
|
|
|
|
+ TimelineStore.SystemFilter.ENTITY_OWNER
|
|
|
|
+ .toString(), owner);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
|
|
|
|
+ if (timelineEntity.getPrimaryFilters() != null) {
|
|
|
|
+ timelineEntity.getPrimaryFilters().remove(
|
|
|
|
+ TimelineStore.SystemFilter.ENTITY_OWNER.toString());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|