|
@@ -19,46 +19,55 @@
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
+import org.apache.hadoop.service.AbstractService;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
|
|
|
|
|
|
-public class MemoryApplicationHistoryStore implements ApplicationHistoryStore {
|
|
|
-
|
|
|
- private static MemoryApplicationHistoryStore memStore = null;
|
|
|
+/**
|
|
|
+ * In-memory implementation of {@link ApplicationHistoryStore}.
|
|
|
+ * This implementation is for test purpose only. If users improperly instantiate
|
|
|
+ * it, they may encounter reading and writing history data in different memory
|
|
|
+ * store.
|
|
|
+ *
|
|
|
+ */
|
|
|
+@Private
|
|
|
+@Unstable
|
|
|
+public class MemoryApplicationHistoryStore extends AbstractService
|
|
|
+ implements ApplicationHistoryStore {
|
|
|
|
|
|
- private ConcurrentHashMap<ApplicationId, ApplicationHistoryData> applicationData =
|
|
|
+ private final ConcurrentMap<ApplicationId, ApplicationHistoryData> applicationData =
|
|
|
new ConcurrentHashMap<ApplicationId, ApplicationHistoryData>();
|
|
|
- private ConcurrentHashMap<ApplicationId, ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData =
|
|
|
- new ConcurrentHashMap<ApplicationId, ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>();
|
|
|
- private ConcurrentHashMap<ContainerId, ContainerHistoryData> containerData =
|
|
|
- new ConcurrentHashMap<ContainerId, ContainerHistoryData>();
|
|
|
+ private final ConcurrentMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData =
|
|
|
+ new ConcurrentHashMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>();
|
|
|
+ private final ConcurrentMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>> containerData =
|
|
|
+ new ConcurrentHashMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>>();
|
|
|
|
|
|
- private MemoryApplicationHistoryStore() {
|
|
|
- }
|
|
|
-
|
|
|
- public static MemoryApplicationHistoryStore getMemoryStore() {
|
|
|
- if (memStore == null) {
|
|
|
- memStore = new MemoryApplicationHistoryStore();
|
|
|
- }
|
|
|
- return memStore;
|
|
|
+ public MemoryApplicationHistoryStore() {
|
|
|
+ super(MemoryApplicationHistoryStore.class.getName());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Map<ApplicationId, ApplicationHistoryData> getAllApplications() {
|
|
|
- Map<ApplicationId, ApplicationHistoryData> listApps =
|
|
|
- new HashMap<ApplicationId, ApplicationHistoryData>();
|
|
|
- for (ApplicationId appId : applicationData.keySet()) {
|
|
|
- listApps.put(appId, applicationData.get(appId));
|
|
|
- }
|
|
|
- return listApps;
|
|
|
+ return new HashMap<ApplicationId, ApplicationHistoryData>(
|
|
|
+ applicationData);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -67,111 +76,209 @@ public class MemoryApplicationHistoryStore implements ApplicationHistoryStore {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> getApplicationAttempts(
|
|
|
- ApplicationId appId) {
|
|
|
- Map<ApplicationAttemptId, ApplicationAttemptHistoryData> listAttempts =
|
|
|
- null;
|
|
|
- ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData> appAttempts =
|
|
|
+ public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
|
|
|
+ getApplicationAttempts(
|
|
|
+ ApplicationId appId) {
|
|
|
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
|
|
|
applicationAttemptData.get(appId);
|
|
|
- if (appAttempts != null) {
|
|
|
- listAttempts =
|
|
|
- new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
|
|
|
- for (ApplicationAttemptId attemptId : appAttempts.keySet()) {
|
|
|
- listAttempts.put(attemptId, appAttempts.get(attemptId));
|
|
|
- }
|
|
|
+ if (subMap == null) {
|
|
|
+ return Collections.<ApplicationAttemptId, ApplicationAttemptHistoryData>emptyMap();
|
|
|
+ } else {
|
|
|
+ return new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>(subMap);
|
|
|
}
|
|
|
- return listAttempts;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public ApplicationAttemptHistoryData getApplicationAttempt(
|
|
|
ApplicationAttemptId appAttemptId) {
|
|
|
- ApplicationAttemptHistoryData appAttemptHistoryData = null;
|
|
|
- ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData> appAttempts =
|
|
|
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
|
|
|
applicationAttemptData.get(appAttemptId.getApplicationId());
|
|
|
- if (appAttempts != null) {
|
|
|
- appAttemptHistoryData = appAttempts.get(appAttemptId);
|
|
|
+ if (subMap == null) {
|
|
|
+ return null;
|
|
|
+ } else {
|
|
|
+ return subMap.get(appAttemptId);
|
|
|
}
|
|
|
- return appAttemptHistoryData;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) {
|
|
|
- ContainerHistoryData Container = null;
|
|
|
- ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData> appAttempts =
|
|
|
- applicationAttemptData.get(appAttemptId.getApplicationId());
|
|
|
- if (appAttempts != null) {
|
|
|
- containerData.get(appAttempts.get(appAttemptId).getMasterContainerId());
|
|
|
+ public ContainerHistoryData getAMContainer(
|
|
|
+ ApplicationAttemptId appAttemptId) {
|
|
|
+ ApplicationAttemptHistoryData appAttempt =
|
|
|
+ getApplicationAttempt(appAttemptId);
|
|
|
+ if (appAttempt == null || appAttempt.getMasterContainerId() == null) {
|
|
|
+ return null;
|
|
|
+ } else {
|
|
|
+ return getContainer(appAttempt.getMasterContainerId());
|
|
|
}
|
|
|
- return Container;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public ContainerHistoryData getContainer(ContainerId containerId) {
|
|
|
- return containerData.get(containerId);
|
|
|
+ Map<ContainerId, ContainerHistoryData> subMap =
|
|
|
+ containerData.get(containerId.getApplicationAttemptId());
|
|
|
+ if (subMap == null) {
|
|
|
+ return null;
|
|
|
+ } else {
|
|
|
+ return subMap.get(containerId);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void writeApplication(ApplicationHistoryData app) throws Throwable {
|
|
|
- if (app != null) {
|
|
|
- ApplicationHistoryData oldData =
|
|
|
- applicationData.putIfAbsent(app.getApplicationId(), app);
|
|
|
- if (oldData != null) {
|
|
|
- throw new IOException("This application "
|
|
|
- + app.getApplicationId().toString() + " is already present.");
|
|
|
- }
|
|
|
+ public Map<ContainerId, ContainerHistoryData> getContainers(
|
|
|
+ ApplicationAttemptId appAttemptId) throws IOException {
|
|
|
+ ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
|
|
|
+ containerData.get(appAttemptId);
|
|
|
+ if (subMap == null) {
|
|
|
+ return Collections.<ContainerId, ContainerHistoryData>emptyMap();
|
|
|
+ } else {
|
|
|
+ return new HashMap<ContainerId, ContainerHistoryData>(subMap);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void writeApplicationAttempt(ApplicationAttemptHistoryData appAttempt)
|
|
|
- throws Throwable {
|
|
|
- if (appAttempt != null) {
|
|
|
- if (applicationAttemptData.containsKey(appAttempt
|
|
|
- .getApplicationAttemptId().getApplicationId())) {
|
|
|
- ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData> appAttemptmap =
|
|
|
- applicationAttemptData.get(appAttempt.getApplicationAttemptId()
|
|
|
- .getApplicationId());
|
|
|
- ApplicationAttemptHistoryData oldAppAttempt =
|
|
|
- appAttemptmap.putIfAbsent(appAttempt.getApplicationAttemptId(),
|
|
|
- appAttempt);
|
|
|
- if (oldAppAttempt != null) {
|
|
|
- throw new IOException("This application attempt "
|
|
|
- + appAttempt.getApplicationAttemptId().toString()
|
|
|
- + " already present.");
|
|
|
- }
|
|
|
- } else {
|
|
|
- ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData> appAttemptmap =
|
|
|
- new ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
|
|
|
- appAttemptmap.put(appAttempt.getApplicationAttemptId(), appAttempt);
|
|
|
- applicationAttemptData.putIfAbsent(appAttempt.getApplicationAttemptId()
|
|
|
- .getApplicationId(), appAttemptmap);
|
|
|
- }
|
|
|
+ public void applicationStarted(ApplicationStartData appStart)
|
|
|
+ throws IOException {
|
|
|
+ ApplicationHistoryData oldData =
|
|
|
+ applicationData.putIfAbsent(appStart.getApplicationId(),
|
|
|
+ ApplicationHistoryData.newInstance(
|
|
|
+ appStart.getApplicationId(),
|
|
|
+ appStart.getApplicationName(),
|
|
|
+ appStart.getApplicationType(),
|
|
|
+ appStart.getQueue(),
|
|
|
+ appStart.getUser(),
|
|
|
+ appStart.getSubmitTime(),
|
|
|
+ appStart.getStartTime(),
|
|
|
+ Long.MAX_VALUE, null, null, null));
|
|
|
+ if (oldData != null) {
|
|
|
+ throw new IOException("The start information of application "
|
|
|
+ + appStart.getApplicationId() + " is already stored.");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void writeContainer(ContainerHistoryData container) throws Throwable {
|
|
|
- if (container != null) {
|
|
|
- ContainerHistoryData oldContainer =
|
|
|
- containerData.putIfAbsent(container.getContainerId(), container);
|
|
|
- if (oldContainer != null) {
|
|
|
- throw new IOException("This container "
|
|
|
- + container.getContainerId().toString() + " is already present.");
|
|
|
- }
|
|
|
+ public void applicationFinished(ApplicationFinishData appFinish)
|
|
|
+ throws IOException {
|
|
|
+ ApplicationHistoryData data =
|
|
|
+ applicationData.get(appFinish.getApplicationId());
|
|
|
+ if (data == null) {
|
|
|
+ throw new IOException("The finish information of application "
|
|
|
+ + appFinish.getApplicationId() + " is stored before the start"
|
|
|
+ + " information.");
|
|
|
}
|
|
|
+ // Make the assumption that YarnApplicationState should not be null if
|
|
|
+ // the finish information is already recorded
|
|
|
+ if (data.getYarnApplicationState() != null) {
|
|
|
+ throw new IOException("The finish information of application "
|
|
|
+ + appFinish.getApplicationId() + " is already stored.");
|
|
|
+ }
|
|
|
+ data.setFinishTime(appFinish.getFinishTime());
|
|
|
+ data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo());
|
|
|
+ data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus());
|
|
|
+ data.setYarnApplicationState(appFinish.getYarnApplicationState());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public Map<ContainerId, ContainerHistoryData> getContainers(
|
|
|
- ApplicationAttemptId appAttemptId) throws IOException {
|
|
|
- HashMap<ContainerId, ContainerHistoryData> containers =
|
|
|
- new HashMap<ContainerId, ContainerHistoryData>();
|
|
|
- for (ContainerId container : containerData.keySet()) {
|
|
|
- if (container.getApplicationAttemptId().equals(appAttemptId)) {
|
|
|
- containers.put(container, containerData.get(container));
|
|
|
- }
|
|
|
+ public void applicationAttemptStarted(
|
|
|
+ ApplicationAttemptStartData appAttemptStart) throws IOException {
|
|
|
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
|
|
|
+ getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId());
|
|
|
+ ApplicationAttemptHistoryData oldData = subMap.putIfAbsent(
|
|
|
+ appAttemptStart.getApplicationAttemptId(),
|
|
|
+ ApplicationAttemptHistoryData.newInstance(
|
|
|
+ appAttemptStart.getApplicationAttemptId(),
|
|
|
+ appAttemptStart.getHost(),
|
|
|
+ appAttemptStart.getRPCPort(),
|
|
|
+ appAttemptStart.getMasterContainerId(),
|
|
|
+ null, null, null, null));
|
|
|
+ if (oldData != null) {
|
|
|
+ throw new IOException("The start information of application attempt "
|
|
|
+ + appAttemptStart.getApplicationAttemptId()
|
|
|
+ + " is already stored.");
|
|
|
}
|
|
|
- return containers;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void applicationAttemptFinished(
|
|
|
+ ApplicationAttemptFinishData appAttemptFinish) throws IOException {
|
|
|
+ ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap =
|
|
|
+ getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId());
|
|
|
+ ApplicationAttemptHistoryData data =
|
|
|
+ subMap.get(appAttemptFinish.getApplicationAttemptId());
|
|
|
+ if (data == null) {
|
|
|
+ throw new IOException("The finish information of application attempt "
|
|
|
+ + appAttemptFinish.getApplicationAttemptId() + " is stored before"
|
|
|
+ + " the start information.");
|
|
|
+ }
|
|
|
+ // Make the assumption that YarnApplicationAttemptState should not be null
|
|
|
+ // if the finish information is already recorded
|
|
|
+ if (data.getYarnApplicationAttemptState() != null) {
|
|
|
+ throw new IOException("The finish information of application attempt "
|
|
|
+ + appAttemptFinish.getApplicationAttemptId()
|
|
|
+ + " is already stored.");
|
|
|
+ }
|
|
|
+ data.setTrackingURL(appAttemptFinish.getTrackingURL());
|
|
|
+ data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo());
|
|
|
+ data.setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus());
|
|
|
+ data.setYarnApplicationAttemptState(appAttemptFinish.getYarnApplicationAttemptState());
|
|
|
+ }
|
|
|
+
|
|
|
+ private ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>
|
|
|
+ getSubMap(ApplicationId appId) {
|
|
|
+ applicationAttemptData.putIfAbsent(appId,
|
|
|
+ new ConcurrentHashMap<ApplicationAttemptId,
|
|
|
+ ApplicationAttemptHistoryData>());
|
|
|
+ return applicationAttemptData.get(appId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void containerStarted(ContainerStartData containerStart)
|
|
|
+ throws IOException {
|
|
|
+ ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
|
|
|
+ getSubMap(containerStart.getContainerId().getApplicationAttemptId());
|
|
|
+ ContainerHistoryData oldData = subMap.putIfAbsent(
|
|
|
+ containerStart.getContainerId(),
|
|
|
+ ContainerHistoryData.newInstance(
|
|
|
+ containerStart.getContainerId(),
|
|
|
+ containerStart.getAllocatedResource(),
|
|
|
+ containerStart.getAssignedNode(),
|
|
|
+ containerStart.getPriority(),
|
|
|
+ containerStart.getStartTime(),
|
|
|
+ Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null));
|
|
|
+ if (oldData != null) {
|
|
|
+ throw new IOException("The start information of container "
|
|
|
+ + containerStart.getContainerId() + " is already stored.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void containerFinished(ContainerFinishData containerFinish)
|
|
|
+ throws IOException {
|
|
|
+ ConcurrentMap<ContainerId, ContainerHistoryData> subMap =
|
|
|
+ getSubMap(containerFinish.getContainerId().getApplicationAttemptId());
|
|
|
+ ContainerHistoryData data = subMap.get(containerFinish.getContainerId());
|
|
|
+ if (data == null) {
|
|
|
+ throw new IOException("The finish information of container "
|
|
|
+ + containerFinish.getContainerId() + " is stored before"
|
|
|
+ + " the start information.");
|
|
|
+ }
|
|
|
+ // Make the assumption that ContainerState should not be null if
|
|
|
+ // the finish information is already recorded
|
|
|
+ if (data.getContainerState() != null) {
|
|
|
+ throw new IOException("The finish information of container "
|
|
|
+ + containerFinish.getContainerId() + " is already stored.");
|
|
|
+ }
|
|
|
+ data.setFinishTime(containerFinish.getFinishTime());
|
|
|
+ data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo());
|
|
|
+ data.setLogURL(containerFinish.getLogURL());
|
|
|
+ data.setContainerExitStatus(containerFinish.getContainerExitStatus());
|
|
|
+ data.setContainerState(containerFinish.getContainerState());
|
|
|
+ }
|
|
|
+
|
|
|
+ private ConcurrentMap<ContainerId, ContainerHistoryData> getSubMap(
|
|
|
+ ApplicationAttemptId appAttemptId) {
|
|
|
+ containerData.putIfAbsent(appAttemptId,
|
|
|
+ new ConcurrentHashMap<ContainerId, ContainerHistoryData>());
|
|
|
+ return containerData.get(appAttemptId);
|
|
|
+ }
|
|
|
+
|
|
|
}
|