浏览代码

YARN-1634. Added a testable in-memory implementation of ApplicationTimelineStore. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1564583 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 11 年之前
父节点
当前提交
96578f0e01

+ 5 - 0
hadoop-yarn-project/CHANGES.txt

@@ -90,6 +90,9 @@ Release 2.4.0 - UNRELEASED
     implementing different storage impls for storing timeline information.
     (Billie Rinaldi via vinodkv)
 
+    YARN-1634. Added a testable in-memory implementation of
+    ApplicationTimelineStore. (Zhijie Shen via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
@@ -126,6 +129,8 @@ Release 2.4.0 - UNRELEASED
     be available across RM failover by making using of a remote
     configuration-provider. (Xuan Gong via vinodkv)
 
+  OPTIMIZATIONS
+
   BUG FIXES
 
     YARN-935. Correcting pom.xml to build applicationhistoryserver module

+ 89 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ATSEntity.java

@@ -50,7 +50,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 @XmlAccessorType(XmlAccessType.NONE)
 @Public
 @Unstable
-public class ATSEntity {
+public class ATSEntity implements Comparable<ATSEntity> {
 
   private String entityType;
   private String entityId;
@@ -310,4 +310,92 @@ public class ATSEntity {
     this.otherInfo = otherInfo;
   }
 
+  @Override
+  public int hashCode() {
+    // generated by eclipse
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((entityId == null) ? 0 : entityId.hashCode());
+    result =
+        prime * result + ((entityType == null) ? 0 : entityType.hashCode());
+    result = prime * result + ((events == null) ? 0 : events.hashCode());
+    result = prime * result + ((otherInfo == null) ? 0 : otherInfo.hashCode());
+    result =
+        prime * result
+            + ((primaryFilters == null) ? 0 : primaryFilters.hashCode());
+    result =
+        prime * result
+            + ((relatedEntities == null) ? 0 : relatedEntities.hashCode());
+    result = prime * result + ((startTime == null) ? 0 : startTime.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // generated by eclipse
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ATSEntity other = (ATSEntity) obj;
+    if (entityId == null) {
+      if (other.entityId != null)
+        return false;
+    } else if (!entityId.equals(other.entityId))
+      return false;
+    if (entityType == null) {
+      if (other.entityType != null)
+        return false;
+    } else if (!entityType.equals(other.entityType))
+      return false;
+    if (events == null) {
+      if (other.events != null)
+        return false;
+    } else if (!events.equals(other.events))
+      return false;
+    if (otherInfo == null) {
+      if (other.otherInfo != null)
+        return false;
+    } else if (!otherInfo.equals(other.otherInfo))
+      return false;
+    if (primaryFilters == null) {
+      if (other.primaryFilters != null)
+        return false;
+    } else if (!primaryFilters.equals(other.primaryFilters))
+      return false;
+    if (relatedEntities == null) {
+      if (other.relatedEntities != null)
+        return false;
+    } else if (!relatedEntities.equals(other.relatedEntities))
+      return false;
+    if (startTime == null) {
+      if (other.startTime != null)
+        return false;
+    } else if (!startTime.equals(other.startTime))
+      return false;
+    return true;
+  }
+
+  @Override
+  public int compareTo(ATSEntity other) {
+    int comparison = entityType.compareTo(other.entityType);
+    if (comparison == 0) {
+      long thisStartTime =
+          startTime == null ? Long.MIN_VALUE : startTime;
+      long otherStartTime =
+          other.startTime == null ? Long.MIN_VALUE : other.startTime;
+      if (thisStartTime > otherStartTime) {
+        return -1;
+      } else if (thisStartTime < otherStartTime) {
+        return 1;
+      } else {
+        return entityId.compareTo(other.entityId);
+      }
+    } else {
+      return comparison;
+    }
+  }
+
 }

+ 100 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/EntityId.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * The unique identifier for an entity
+ */
+@Private
+@Unstable
+public class EntityId implements Comparable<EntityId> {
+
+  private String id;
+  private String type;
+
+  public EntityId(String id, String type) {
+    this.id = id;
+    this.type = type;
+  }
+
+  /**
+   * Get the entity Id.
+   * @return The entity Id.
+   */
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the entity type.
+   * @return The entity type.
+   */
+  public String getType() {
+    return type;
+  }
+
+  @Override
+  public int compareTo(EntityId other) {
+    int c = type.compareTo(other.type);
+    if (c != 0) return c;
+    return id.compareTo(other.id);
+  }
+
+  @Override
+  public int hashCode() {
+    // generated by eclipse
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((id == null) ? 0 : id.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // generated by eclipse
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    EntityId other = (EntityId) obj;
+    if (id == null) {
+      if (other.id != null)
+        return false;
+    } else if (!id.equals(other.id))
+      return false;
+    if (type == null) {
+      if (other.type != null)
+        return false;
+    } else if (!type.equals(other.type))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "{ id: " + id + ", type: "+ type + " }";
+  }
+
+}

+ 288 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/MemoryApplicationTimelineStore.java

@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
+
+/**
+ * In-memory implementation of {@link ApplicationTimelineStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ * 
+ */
+@Private
+@Unstable
+public class MemoryApplicationTimelineStore
+    extends AbstractService implements ApplicationTimelineStore {
+
+  private Map<EntityId, ATSEntity> entities =
+      new HashMap<EntityId, ATSEntity>();
+
+  public MemoryApplicationTimelineStore() {
+    super(MemoryApplicationTimelineStore.class.getName());
+  }
+
+  @Override
+  public ATSEntities getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, NameValuePair primaryFilter,
+      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) {
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+    List<ATSEntity> entitiesSelected = new ArrayList<ATSEntity>();
+    for (ATSEntity entity : new PriorityQueue<ATSEntity>(entities.values())) {
+      if (entitiesSelected.size() >= limit) {
+        break;
+      }
+      if (!entity.getEntityType().equals(entityType)) {
+        continue;
+      }
+      if (entity.getStartTime() <= windowStart) {
+        continue;
+      }
+      if (entity.getStartTime() > windowEnd) {
+        continue;
+      }
+      if (primaryFilter != null &&
+          !matchFilter(entity.getPrimaryFilters(), primaryFilter)) {
+        continue;
+      }
+      if (secondaryFilters != null) { // OR logic
+        boolean flag = false;
+        for (NameValuePair secondaryFilter : secondaryFilters) {
+          if (secondaryFilter != null &&
+              matchFilter(entity.getOtherInfo(), secondaryFilter)) {
+            flag = true;
+            break;
+          }
+        }
+        if (!flag) {
+          continue;
+        }
+      }
+      entitiesSelected.add(entity);
+    }
+    List<ATSEntity> entitiesToReturn = new ArrayList<ATSEntity>();
+    for (ATSEntity entitySelected : entitiesSelected) {
+      entitiesToReturn.add(maskFields(entitySelected, fields));
+    }
+    Collections.sort(entitiesToReturn);
+    ATSEntities entitiesWrapper = new ATSEntities();
+    entitiesWrapper.setEntities(entitiesToReturn);
+    return entitiesWrapper;
+  }
+
+  @Override
+  public ATSEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fieldsToRetrieve) {
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.allOf(Field.class);
+    }
+    ATSEntity entity = entities.get(new EntityId(entityId, entityType));
+    if (entity == null) {
+      return null;
+    } else {
+      return maskFields(entity, fieldsToRetrieve);
+    }
+  }
+
+  @Override
+  public ATSEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd,
+      Set<String> eventTypes) {
+    ATSEvents allEvents = new ATSEvents();
+    if (entityIds == null) {
+      return allEvents;
+    }
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    for (String entityId : entityIds) {
+      EntityId entityID = new EntityId(entityId, entityType);
+      ATSEntity entity = entities.get(entityID);
+      if (entity == null) {
+        continue;
+      }
+      ATSEventsOfOneEntity events = new ATSEventsOfOneEntity();
+      events.setEntityId(entityId);
+      events.setEntityType(entityType);
+      for (ATSEvent event : entity.getEvents()) {
+        if (events.getEvents().size() >= limit) {
+          break;
+        }
+        if (event.getTimestamp() <= windowStart) {
+          continue;
+        }
+        if (event.getTimestamp() > windowEnd) {
+          continue;
+        }
+        if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
+          continue;
+        }
+        events.addEvent(event);
+      }
+      allEvents.addEvent(events);
+    }
+    return allEvents;
+  }
+
+  @Override
+  public ATSPutErrors put(ATSEntities data) {
+    ATSPutErrors errors = new ATSPutErrors();
+    for (ATSEntity entity : data.getEntities()) {
+      EntityId entityId =
+          new EntityId(entity.getEntityId(), entity.getEntityType());
+      // store entity info in memory
+      ATSEntity existingEntity = entities.get(entityId);
+      if (existingEntity == null) {
+        existingEntity = new ATSEntity();
+        existingEntity.setEntityId(entity.getEntityId());
+        existingEntity.setEntityType(entity.getEntityType());
+        existingEntity.setStartTime(entity.getStartTime());
+        entities.put(entityId, existingEntity);
+      }
+      if (entity.getEvents() != null) {
+        if (existingEntity.getEvents() == null) {
+          existingEntity.setEvents(entity.getEvents());
+        } else {
+          existingEntity.addEvents(entity.getEvents());
+        }
+        Collections.sort(existingEntity.getEvents());
+      }
+      // check startTime
+      if (existingEntity.getStartTime() == null) {
+        if (existingEntity.getEvents() == null
+            || existingEntity.getEvents().isEmpty()) {
+          ATSPutError error = new ATSPutError();
+          error.setEntityId(entityId.getId());
+          error.setEntityType(entityId.getType());
+          error.setErrorCode(1);
+          errors.addError(error);
+          entities.remove(entityId);
+          continue;
+        } else {
+          existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp());
+        }
+      }
+      if (entity.getPrimaryFilters() != null) {
+        if (existingEntity.getPrimaryFilters() == null) {
+          existingEntity.setPrimaryFilters(entity.getPrimaryFilters());
+        } else {
+          existingEntity.addPrimaryFilters(entity.getPrimaryFilters());
+        }
+      }
+      if (entity.getOtherInfo() != null) {
+        if (existingEntity.getOtherInfo() == null) {
+          existingEntity.setOtherInfo(entity.getOtherInfo());
+        } else {
+          existingEntity.addOtherInfo(entity.getOtherInfo());
+        }
+      }
+      // relate it to other entities
+      if (entity.getRelatedEntities() == null) {
+        continue;
+      }
+      for (Map.Entry<String, List<String>> partRelatedEntities : entity
+          .getRelatedEntities().entrySet()) {
+        if (partRelatedEntities == null) {
+          continue;
+        }
+        for (String idStr : partRelatedEntities.getValue()) {
+          EntityId relatedEntityId =
+              new EntityId(idStr, partRelatedEntities.getKey());
+          ATSEntity relatedEntity = entities.get(relatedEntityId);
+          if (relatedEntity != null) {
+            relatedEntity.addRelatedEntity(
+                existingEntity.getEntityType(), existingEntity.getEntityId());
+          }
+        }
+      }
+    }
+    return errors;
+  }
+
+  private static ATSEntity maskFields(
+      ATSEntity entity, EnumSet<Field> fields) {
+    // Conceal the fields that are not going to be exposed
+    ATSEntity entityToReturn = new ATSEntity();
+    entityToReturn.setEntityId(entity.getEntityId());
+    entityToReturn.setEntityType(entity.getEntityType());
+    entityToReturn.setStartTime(entity.getStartTime());
+    entityToReturn.setEvents(fields.contains(Field.EVENTS) ?
+        entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ?
+            Arrays.asList(entity.getEvents().get(0)) : null);
+    entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ?
+        entity.getRelatedEntities() : null);
+    entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ?
+        entity.getPrimaryFilters() : null);
+    entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ?
+        entity.getOtherInfo() : null);
+    return entityToReturn;
+  }
+
+  private static boolean matchFilter(Map<String, Object> tags,
+      NameValuePair filter) {
+    Object value = tags.get(filter.getName());
+    if (value == null) { // doesn't have the filter
+      return false;
+    } else if (!value.equals(filter.getValue())) { // doesn't match the filter
+      return false;
+    }
+    return true;
+  }
+
+}

+ 532 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/ApplicationTimelineStoreTestUtils.java

@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvent;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSEvents.ATSEventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors;
+import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineReader.Field;
+
+public class ApplicationTimelineStoreTestUtils {
+
+  private static final Map<String, Object> EMPTY_MAP = Collections.emptyMap();
+  private static final Map<String, List<String>> EMPTY_REL_ENTITIES =
+      new HashMap<String, List<String>>();
+
+  protected ApplicationTimelineStore store;
+  private String entity1;
+  private String entityType1;
+  private String entity1b;
+  private String entity2;
+  private String entityType2;
+  private Map<String, Object> primaryFilters;
+  private Map<String, Object> secondaryFilters;
+  private Map<String, Object> allFilters;
+  private Map<String, Object> otherInfo;
+  private Map<String, List<String>> relEntityMap;
+  private NameValuePair userFilter;
+  private Collection<NameValuePair> goodTestingFilters;
+  private Collection<NameValuePair> badTestingFilters;
+  private ATSEvent ev1;
+  private ATSEvent ev2;
+  private ATSEvent ev3;
+  private ATSEvent ev4;
+  private Map<String, Object> eventInfo;
+  private List<ATSEvent> events1;
+  private List<ATSEvent> events2;
+
+  /**
+   * Load test data into the given store
+   */
+  protected void loadTestData() {
+    ATSEntities atsEntities = new ATSEntities();
+    Map<String, Object> primaryFilters = new HashMap<String, Object>();
+    primaryFilters.put("user", "username");
+    primaryFilters.put("appname", 12345l);
+    Map<String, Object> secondaryFilters = new HashMap<String, Object>();
+    secondaryFilters.put("startTime", 123456l);
+    secondaryFilters.put("status", "RUNNING");
+    Map<String, Object> otherInfo1 = new HashMap<String, Object>();
+    otherInfo1.put("info1", "val1");
+    otherInfo1.putAll(secondaryFilters);
+
+    String entity1 = "id_1";
+    String entityType1 = "type_1";
+    String entity1b = "id_2";
+    String entity2 = "id_2";
+    String entityType2 = "type_2";
+
+    Map<String, List<String>> relatedEntities =
+        new HashMap<String, List<String>>();
+    relatedEntities.put(entityType2, Collections.singletonList(entity2));
+
+    ATSEvent ev3 = createEvent(789l, "launch_event", null);
+    ATSEvent ev4 = createEvent(-123l, "init_event", null);
+    List<ATSEvent> events = new ArrayList<ATSEvent>();
+    events.add(ev3);
+    events.add(ev4);
+    atsEntities.setEntities(Collections.singletonList(createEntity(entity2,
+        entityType2, null, events, null, null, null)));
+    ATSPutErrors response = store.put(atsEntities);
+    assertEquals(0, response.getErrors().size());
+
+    ATSEvent ev1 = createEvent(123l, "start_event", null);
+    atsEntities.setEntities(Collections.singletonList(createEntity(entity1,
+        entityType1, 123l, Collections.singletonList(ev1),
+        relatedEntities, primaryFilters, otherInfo1)));
+    response = store.put(atsEntities);
+    assertEquals(0, response.getErrors().size());
+    atsEntities.setEntities(Collections.singletonList(createEntity(entity1b,
+        entityType1, null, Collections.singletonList(ev1), relatedEntities,
+        primaryFilters, otherInfo1)));
+    response = store.put(atsEntities);
+    assertEquals(0, response.getErrors().size());
+
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put("event info 1", "val1");
+    ATSEvent ev2 = createEvent(456l, "end_event", eventInfo);
+    Map<String, Object> otherInfo2 = new HashMap<String, Object>();
+    otherInfo2.put("info2", "val2");
+    atsEntities.setEntities(Collections.singletonList(createEntity(entity1,
+        entityType1, null, Collections.singletonList(ev2), null,
+        primaryFilters, otherInfo2)));
+    response = store.put(atsEntities);
+    assertEquals(0, response.getErrors().size());
+    atsEntities.setEntities(Collections.singletonList(createEntity(entity1b,
+        entityType1, 123l, Collections.singletonList(ev2), null,
+        primaryFilters, otherInfo2)));
+    response = store.put(atsEntities);
+    assertEquals(0, response.getErrors().size());
+
+    atsEntities.setEntities(Collections.singletonList(createEntity(
+        "badentityid", "badentity", null, null, null, null, otherInfo1)));
+    response = store.put(atsEntities);
+    assertEquals(1, response.getErrors().size());
+    ATSPutError error = response.getErrors().get(0);
+    assertEquals("badentityid", error.getEntityId());
+    assertEquals("badentity", error.getEntityType());
+    assertEquals((Integer) 1, error.getErrorCode());
+  }
+
+  /**
+   * Load veification data
+   */
+  protected void loadVerificationData() throws Exception {
+    userFilter = new NameValuePair("user",
+        "username");
+    goodTestingFilters = new ArrayList<NameValuePair>();
+    goodTestingFilters.add(new NameValuePair("appname", 12345l));
+    goodTestingFilters.add(new NameValuePair("status", "RUNNING"));
+    badTestingFilters = new ArrayList<NameValuePair>();
+    badTestingFilters.add(new NameValuePair("appname", 12345l));
+    badTestingFilters.add(new NameValuePair("status", "FINISHED"));
+
+    primaryFilters = new HashMap<String, Object>();
+    primaryFilters.put("user", "username");
+    primaryFilters.put("appname", 12345l);
+    secondaryFilters = new HashMap<String, Object>();
+    secondaryFilters.put("startTime", 123456l);
+    secondaryFilters.put("status", "RUNNING");
+    allFilters = new HashMap<String, Object>();
+    allFilters.putAll(secondaryFilters);
+    allFilters.putAll(primaryFilters);
+    otherInfo = new HashMap<String, Object>();
+    otherInfo.put("info1", "val1");
+    otherInfo.put("info2", "val2");
+    otherInfo.putAll(secondaryFilters);
+
+    entity1 = "id_1";
+    entityType1 = "type_1";
+    entity1b = "id_2";
+    entity2 = "id_2";
+    entityType2 = "type_2";
+
+    ev1 = createEvent(123l, "start_event", null);
+
+    eventInfo = new HashMap<String, Object>();
+    eventInfo.put("event info 1", "val1");
+    ev2 = createEvent(456l, "end_event", eventInfo);
+    events1 = new ArrayList<ATSEvent>();
+    events1.add(ev2);
+    events1.add(ev1);
+
+    relEntityMap =
+        new HashMap<String, List<String>>();
+    List<String> ids = new ArrayList<String>();
+    ids.add(entity1);
+    ids.add(entity1b);
+    relEntityMap.put(entityType1, ids);
+
+    ev3 = createEvent(789l, "launch_event", null);
+    ev4 = createEvent(-123l, "init_event", null);
+    events2 = new ArrayList<ATSEvent>();
+    events2.add(ev3);
+    events2.add(ev4);
+  }
+
+  public void testGetSingleEntity() {
+    // test getting entity info
+    verifyEntityInfo(null, null, null, null, null, null,
+        store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)));
+
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, store.getEntity(entity1, entityType1,
+        EnumSet.allOf(Field.class)));
+
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, store.getEntity(entity1b, entityType1,
+        EnumSet.allOf(Field.class)));
+
+    verifyEntityInfo(entity2, entityType2, events2, relEntityMap, EMPTY_MAP,
+        EMPTY_MAP, store.getEntity(entity2, entityType2,
+        EnumSet.allOf(Field.class)));
+
+    // test getting single fields
+    verifyEntityInfo(entity1, entityType1, events1, null, null, null,
+        store.getEntity(entity1, entityType1, EnumSet.of(Field.EVENTS)));
+
+    verifyEntityInfo(entity1, entityType1, Collections.singletonList(ev2),
+        null, null, null, store.getEntity(entity1, entityType1,
+        EnumSet.of(Field.LAST_EVENT_ONLY)));
+
+    verifyEntityInfo(entity1, entityType1, null, null, primaryFilters, null,
+        store.getEntity(entity1, entityType1,
+            EnumSet.of(Field.PRIMARY_FILTERS)));
+
+    verifyEntityInfo(entity1, entityType1, null, null, null, otherInfo,
+        store.getEntity(entity1, entityType1, EnumSet.of(Field.OTHER_INFO)));
+
+    verifyEntityInfo(entity2, entityType2, null, relEntityMap, null, null,
+        store.getEntity(entity2, entityType2,
+            EnumSet.of(Field.RELATED_ENTITIES)));
+  }
+
+  public void testGetEntities() {
+    // test getting entities
+    assertEquals("nonzero entities size for nonexistent type", 0,
+        store.getEntities("type_0", null, null, null, null, null,
+            null).getEntities().size());
+    assertEquals("nonzero entities size for nonexistent type", 0,
+        store.getEntities("type_3", null, null, null, null, null,
+            null).getEntities().size());
+    assertEquals("nonzero entities size for nonexistent type", 0,
+        store.getEntities("type_0", null, null, null, userFilter,
+            null, null).getEntities().size());
+    assertEquals("nonzero entities size for nonexistent type", 0,
+        store.getEntities("type_3", null, null, null, userFilter,
+            null, null).getEntities().size());
+
+    List<ATSEntity> entities =
+        store.getEntities("type_1", null, null, null, null, null,
+            EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    entities = store.getEntities("type_2", null, null, null, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entity2, entityType2, events2, relEntityMap, EMPTY_MAP,
+        EMPTY_MAP, entities.get(0));
+
+    entities = store.getEntities("type_1", 1l, null, null, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+
+    entities = store.getEntities("type_1", 1l, 0l, null, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+
+    entities = store.getEntities("type_1", null, 234l, null, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+
+    entities = store.getEntities("type_1", null, 123l, null, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+
+    entities = store.getEntities("type_1", null, 234l, 345l, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+
+    entities = store.getEntities("type_1", null, null, 345l, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    entities = store.getEntities("type_1", null, null, 123l, null, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+  }
+
+  public void testGetEntitiesWithPrimaryFilters() {
+    // test using primary filter
+    assertEquals("nonzero entities size for primary filter", 0,
+        store.getEntities("type_1", null, null, null,
+            new NameValuePair("none", "none"), null,
+            EnumSet.allOf(Field.class)).getEntities().size());
+    assertEquals("nonzero entities size for primary filter", 0,
+        store.getEntities("type_2", null, null, null,
+            new NameValuePair("none", "none"), null,
+            EnumSet.allOf(Field.class)).getEntities().size());
+    assertEquals("nonzero entities size for primary filter", 0,
+        store.getEntities("type_3", null, null, null,
+            new NameValuePair("none", "none"), null,
+            EnumSet.allOf(Field.class)).getEntities().size());
+
+    List<ATSEntity> entities = store.getEntities("type_1", null, null, null,
+        userFilter, null, EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    entities = store.getEntities("type_2", null, null, null, userFilter, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+
+    entities = store.getEntities("type_1", 1l, null, null, userFilter, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+
+    entities = store.getEntities("type_1", 1l, 0l, null, userFilter, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(1, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+
+    entities = store.getEntities("type_1", null, 234l, null, userFilter, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+
+    entities = store.getEntities("type_1", null, 234l, 345l, userFilter, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+
+    entities = store.getEntities("type_1", null, null, 345l, userFilter, null,
+        EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+  }
+
+  public void testGetEntitiesWithSecondaryFilters() {
+    // test using secondary filter
+    List<ATSEntity> entities = store.getEntities("type_1", null, null, null,
+        null, goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    entities = store.getEntities("type_1", null, null, null, userFilter,
+        goodTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(2, entities.size());
+    verifyEntityInfo(entity1, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(0));
+    verifyEntityInfo(entity1b, entityType1, events1, EMPTY_REL_ENTITIES,
+        primaryFilters, otherInfo, entities.get(1));
+
+    entities = store.getEntities("type_1", null, null, null, null,
+        badTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+
+    entities = store.getEntities("type_1", null, null, null, userFilter,
+        badTestingFilters, EnumSet.allOf(Field.class)).getEntities();
+    assertEquals(0, entities.size());
+  }
+
+  public void testGetEvents() {
+    // test getting entity timelines
+    SortedSet<String> sortedSet = new TreeSet<String>();
+    sortedSet.add(entity1);
+    List<ATSEventsOfOneEntity> timelines =
+        store.getEntityTimelines(entityType1, sortedSet, null, null,
+            null, null).getAllEvents();
+    assertEquals(1, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2, ev1);
+
+    sortedSet.add(entity1b);
+    timelines = store.getEntityTimelines(entityType1, sortedSet, null,
+        null, null, null).getAllEvents();
+    assertEquals(2, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2, ev1);
+    verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2, ev1);
+
+    timelines = store.getEntityTimelines(entityType1, sortedSet, 1l,
+        null, null, null).getAllEvents();
+    assertEquals(2, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
+    verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
+
+    timelines = store.getEntityTimelines(entityType1, sortedSet, null,
+        345l, null, null).getAllEvents();
+    assertEquals(2, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
+    verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
+
+    timelines = store.getEntityTimelines(entityType1, sortedSet, null,
+        123l, null, null).getAllEvents();
+    assertEquals(2, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
+    verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
+
+    timelines = store.getEntityTimelines(entityType1, sortedSet, null,
+        null, 345l, null).getAllEvents();
+    assertEquals(2, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev1);
+    verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev1);
+
+    timelines = store.getEntityTimelines(entityType1, sortedSet, null,
+        null, 123l, null).getAllEvents();
+    assertEquals(2, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev1);
+    verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev1);
+
+    timelines = store.getEntityTimelines(entityType1, sortedSet, null,
+        null, null, Collections.singleton("end_event")).getAllEvents();
+    assertEquals(2, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity1, entityType1, ev2);
+    verifyEntityTimeline(timelines.get(1), entity1b, entityType1, ev2);
+
+    sortedSet.add(entity2);
+    timelines = store.getEntityTimelines(entityType2, sortedSet, null,
+        null, null, null).getAllEvents();
+    assertEquals(1, timelines.size());
+    verifyEntityTimeline(timelines.get(0), entity2, entityType2, ev3, ev4);
+  }
+
+  /**
+   * Verify a single entity
+   */
+  private static void verifyEntityInfo(String entity, String entityType,
+      List<ATSEvent> events, Map<String, List<String>> relatedEntities,
+      Map<String, Object> primaryFilters, Map<String, Object> otherInfo,
+      ATSEntity retrievedEntityInfo) {
+    if (entity == null) {
+      assertNull(retrievedEntityInfo);
+      return;
+    }
+    assertEquals(entity, retrievedEntityInfo.getEntityId());
+    assertEquals(entityType, retrievedEntityInfo.getEntityType());
+    if (events == null)
+      assertNull(retrievedEntityInfo.getEvents());
+    else
+      assertEquals(events, retrievedEntityInfo.getEvents());
+    if (relatedEntities == null)
+      assertNull(retrievedEntityInfo.getRelatedEntities());
+    else
+      assertEquals(relatedEntities, retrievedEntityInfo.getRelatedEntities());
+    if (primaryFilters == null)
+      assertNull(retrievedEntityInfo.getPrimaryFilters());
+    else
+      assertTrue(primaryFilters.equals(
+          retrievedEntityInfo.getPrimaryFilters()));
+    if (otherInfo == null)
+      assertNull(retrievedEntityInfo.getOtherInfo());
+    else
+      assertTrue(otherInfo.equals(retrievedEntityInfo.getOtherInfo()));
+  }
+
+  /**
+   * Verify timeline events
+   */
+  private static void verifyEntityTimeline(
+      ATSEventsOfOneEntity retrievedEvents, String entity, String entityType,
+      ATSEvent... actualEvents) {
+    assertEquals(entity, retrievedEvents.getEntityId());
+    assertEquals(entityType, retrievedEvents.getEntityType());
+    assertEquals(actualEvents.length, retrievedEvents.getEvents().size());
+    for (int i = 0; i < actualEvents.length; i++) {
+      assertEquals(actualEvents[i], retrievedEvents.getEvents().get(i));
+    }
+  }
+
+  /**
+   * Create a test entity
+   */
+  private static ATSEntity createEntity(String entity, String entityType,
+      Long startTime, List<ATSEvent> events,
+      Map<String, List<String>> relatedEntities,
+      Map<String, Object> primaryFilters, Map<String, Object> otherInfo) {
+    ATSEntity atsEntity = new ATSEntity();
+    atsEntity.setEntityId(entity);
+    atsEntity.setEntityType(entityType);
+    atsEntity.setStartTime(startTime);
+    atsEntity.setEvents(events);
+    if (relatedEntities != null)
+      for (Entry<String, List<String>> e : relatedEntities.entrySet())
+        for (String v : e.getValue())
+          atsEntity.addRelatedEntity(e.getKey(), v);
+    else
+      atsEntity.setRelatedEntities(null);
+    atsEntity.setPrimaryFilters(primaryFilters);
+    atsEntity.setOtherInfo(otherInfo);
+    return atsEntity;
+  }
+
+  /**
+   * Create a test event
+   */
+  private static ATSEvent createEvent(long timestamp, String type, Map<String,
+      Object> info) {
+    ATSEvent event = new ATSEvent();
+    event.setTimestamp(timestamp);
+    event.setEventType(type);
+    event.setEventInfo(info);
+    return event;
+  }
+
+}

+ 73 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestMemoryApplicationTimelineStore.java

@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestMemoryApplicationTimelineStore
+    extends ApplicationTimelineStoreTestUtils {
+
+  @Before
+  public void setup() throws Exception {
+    store = new MemoryApplicationTimelineStore();
+    store.init(new YarnConfiguration());
+    store.start();
+    loadTestData();
+    loadVerificationData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.stop();
+  }
+
+  public ApplicationTimelineStore getApplicationTimelineStore() {
+    return store;
+  }
+
+  @Test
+  public void testGetSingleEntity() {
+    super.testGetSingleEntity();
+  }
+
+  @Test
+  public void testGetEntities() {
+    super.testGetEntities();
+  }
+
+  @Test
+  public void testGetEntitiesWithPrimaryFilters() {
+    super.testGetEntitiesWithPrimaryFilters();
+  }
+
+  @Test
+  public void testGetEntitiesWithSecondaryFilters() {
+    super.testGetEntitiesWithSecondaryFilters();
+  }
+
+  @Test
+  public void testGetEvents() {
+    super.testGetEvents();
+  }
+
+}