Quellcode durchsuchen

YARN-975. Added a file-system implementation for HistoryStorage. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1556727 ../YARN-321


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562184 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli vor 11 Jahren
Ursprung
Commit
a47d981c6e

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -477,6 +477,9 @@ Branch YARN-321: Generic ApplicationHistoryService
   YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via
   devaraj)
 
+  YARN-975. Added a file-system implementation for HistoryStorage. (Zhijie Shen
+  via vinodkv)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -931,6 +931,19 @@ public class YarnConfiguration extends Configuration {
   public static final String YARN_APP_CONTAINER_LOG_BACKUPS =
       YARN_PREFIX + "app.container.log.backups";
 
+  ////////////////////////////////
+  // AHS Configs
+  ////////////////////////////////
+
+  public static final String AHS_PREFIX = YARN_PREFIX + "ahs.";
+
+  /** URI for FileSystemApplicationHistoryStore */
+  public static final String FS_HISTORY_STORE_URI = AHS_PREFIX + "fs-history-store.uri";
+
+  /** T-file compression types used to compress history data.*/
+  public static final String FS_HISTORY_STORE_COMPRESSION_TYPE = AHS_PREFIX + "fs-history-store.compression-type";
+  public static final String DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE = "none";
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1041,6 +1041,23 @@
     <value></value>
   </property>
 
+  <!-- Application History Service's Configuration-->
+
+  <property>
+    <description>URI pointing to the location of the FileSystem path where
+    the history will be persisted. This must be supplied when using
+    org.apache.hadoop.yarn.server.applicationhistoryservice.FileSystemApplicationHistoryStore
+    as the value for yarn.resourcemanager.ahs.writer.class</description>
+    <name>yarn.ahs.fs-history-store.uri</name>
+    <value>${hadoop.log.dir}/yarn/system/ahstore</value>
+  </property>
+
+  <property>
+    <description>T-file compression types used to compress history data.</description>
+    <name>yarn.ahs.fs-history-store.compression-type</name>
+    <value>none</value>
+  </property>
+
   <!-- Other configuration -->
   <property>
     <description>The interval that the yarn client library uses to poll the

+ 860 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java

@@ -0,0 +1,860 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto;
+import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * File system implementation of {@link ApplicationHistoryStore}. In this
+ * implementation, one application will have just one file in the file system,
+ * which contains all the history data of one application, and its attempts and
+ * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to
+ * be invoked first when writing any history data of one application and it will
+ * open a file, while {@link #applicationFinished(ApplicationFinishData)} is
+ * supposed to be last writing operation and will close the file.
+ */
+@Public
+@Unstable
+public class FileSystemApplicationHistoryStore extends AbstractService
+    implements ApplicationHistoryStore {
+
+  private static final Log LOG = LogFactory
+      .getLog(FileSystemApplicationHistoryStore.class);
+
+  private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
+  private static final int MIN_BLOCK_SIZE = 256 * 1024;
+  private static final String START_DATA_SUFFIX = "_start";
+  private static final String FINISH_DATA_SUFFIX = "_finish";
+  private static final FsPermission ROOT_DIR_UMASK =
+      FsPermission.createImmutable((short) 0740);
+  private static final FsPermission HISTORY_FILE_UMASK =
+      FsPermission.createImmutable((short) 0640);
+
+  private FileSystem fs;
+  private Path rootDirPath;
+
+  private ConcurrentMap<ApplicationId, HistoryFileWriter> outstandingWriters =
+      new ConcurrentHashMap<ApplicationId, HistoryFileWriter>();
+
+  public FileSystemApplicationHistoryStore() {
+    super(FileSystemApplicationHistoryStore.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Path fsWorkingPath = new Path(
+        conf.get(YarnConfiguration.FS_HISTORY_STORE_URI));
+    rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+    try {
+      fs = fsWorkingPath.getFileSystem(conf);
+      fs.mkdirs(rootDirPath);
+      fs.setPermission(rootDirPath, ROOT_DIR_UMASK);
+    } catch (IOException e) {
+      LOG.error("Error when initializing FileSystemHistoryStorage", e);
+      throw e;
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    try {
+      for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
+          .entrySet()) {
+        entry.getValue().close();
+      }
+      outstandingWriters.clear();
+    } finally {
+      IOUtils.cleanup(LOG, fs);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public ApplicationHistoryData getApplication(ApplicationId appId)
+      throws IOException {
+    HistoryFileReader hfReader = getHistoryFileReader(appId);
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ApplicationHistoryData historyData =
+          ApplicationHistoryData.newInstance(
+              appId, null, null, null, null, Long.MIN_VALUE, Long.MIN_VALUE,
+              Long.MAX_VALUE, null, FinalApplicationStatus.UNDEFINED, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(appId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ApplicationStartData startData =
+                parseApplicationStartData(entry.value);
+            mergeApplicationHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ApplicationFinishData finishData =
+                parseApplicationFinishData(entry.value);
+            mergeApplicationHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for application " + appId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for application " + appId);
+      }
+      LOG.info("Completed reading history information of application " + appId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of application " + appId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public Map<ApplicationId, ApplicationHistoryData> getAllApplications()
+      throws IOException {
+    Map<ApplicationId, ApplicationHistoryData> historyDataMap =
+        new HashMap<ApplicationId, ApplicationHistoryData>();
+    FileStatus[] files = fs.listStatus(rootDirPath);
+    for (FileStatus file : files) {
+      ApplicationId appId =
+          ConverterUtils.toApplicationId(file.getPath().getName());
+      try {
+        ApplicationHistoryData historyData = getApplication(appId);
+        if (historyData != null) {
+          historyDataMap.put(appId, historyData);
+        }
+      } catch (IOException e) {
+        // Eat the exception not to disturb the getting the next
+        // ApplicationHistoryData
+        LOG.error("History information of application " + appId
+            + " is not included into the result due to the exception", e);
+      }
+    }
+    return historyDataMap;
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, ApplicationAttemptHistoryData>
+      getApplicationAttempts(ApplicationId appId) throws IOException {
+    Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap =
+        new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>();
+    Map<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> startFinshDataMap =
+        new HashMap<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>>();
+    HistoryFileReader hfReader = getHistoryFileReader(appId);
+    try {
+      while (hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.startsWith(ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            retrieveStartFinishData(appId, entry, startFinshDataMap, true);
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            retrieveStartFinishData(appId, entry, startFinshDataMap, false);
+          }
+        }
+      }
+      LOG.info("Completed reading history information of all application"
+          + " attempts of application " + appId);
+    } catch (IOException e) {
+      LOG.info("Error when reading history information of some application"
+          + " attempts of application " + appId);
+    } finally {
+      hfReader.close();
+    }
+    for (Map.Entry<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> entry : startFinshDataMap
+        .entrySet()) {
+      ApplicationAttemptHistoryData historyData =
+          ApplicationAttemptHistoryData.newInstance(
+              entry.getKey(), null, -1, null, null, null,
+              FinalApplicationStatus.UNDEFINED, null);
+      mergeApplicationAttemptHistoryData(historyData,
+          entry.getValue().startData);
+      mergeApplicationAttemptHistoryData(historyData,
+          entry.getValue().finishData);
+      historyDataMap.put(entry.getKey(), historyData);
+    }
+    return historyDataMap;
+  }
+
+  private
+      void
+      retrieveStartFinishData(
+          ApplicationId appId,
+          HistoryFileReader.Entry entry,
+          Map<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> startFinshDataMap,
+          boolean start) throws IOException {
+    ApplicationAttemptId appAttemptId =
+        ConverterUtils.toApplicationAttemptId(entry.key.id);
+    if (appAttemptId.getApplicationId().equals(appId)) {
+      StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData> pair =
+          startFinshDataMap.get(appAttemptId);
+      if (pair == null) {
+        pair =
+            new StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>();
+        startFinshDataMap.put(appAttemptId, pair);
+      }
+      if (start) {
+        pair.startData = parseApplicationAttemptStartData(entry.value);
+      } else {
+        pair.finishData = parseApplicationAttemptFinishData(entry.value);
+      }
+    }
+  }
+
+  @Override
+  public ApplicationAttemptHistoryData getApplicationAttempt(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    HistoryFileReader hfReader =
+        getHistoryFileReader(appAttemptId.getApplicationId());
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ApplicationAttemptHistoryData historyData =
+          ApplicationAttemptHistoryData.newInstance(
+              appAttemptId, null, -1, null, null, null,
+              FinalApplicationStatus.UNDEFINED, null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(appAttemptId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ApplicationAttemptStartData startData =
+                parseApplicationAttemptStartData(entry.value);
+            mergeApplicationAttemptHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ApplicationAttemptFinishData finishData =
+                parseApplicationAttemptFinishData(entry.value);
+            mergeApplicationAttemptHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for application attempt "
+            + appAttemptId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for application attempt "
+            + appAttemptId);
+      }
+      LOG.info("Completed reading history information of application attempt "
+          + appAttemptId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of application attempt"
+          + appAttemptId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getContainer(ContainerId containerId)
+      throws IOException {
+    HistoryFileReader hfReader =
+        getHistoryFileReader(containerId.getApplicationAttemptId()
+            .getApplicationId());
+    try {
+      boolean readStartData = false;
+      boolean readFinishData = false;
+      ContainerHistoryData historyData =
+          ContainerHistoryData.newInstance(containerId, null, null, null,
+              Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE,
+              null);
+      while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.equals(containerId.toString())) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            ContainerStartData startData =
+                parseContainerStartData(entry.value);
+            mergeContainerHistoryData(historyData, startData);
+            readStartData = true;
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            ContainerFinishData finishData =
+                parseContainerFinishData(entry.value);
+            mergeContainerHistoryData(historyData, finishData);
+            readFinishData = true;
+          }
+        }
+      }
+      if (!readStartData && !readFinishData) {
+        return null;
+      }
+      if (!readStartData) {
+        LOG.warn("Start information is missing for container " + containerId);
+      }
+      if (!readFinishData) {
+        LOG.warn("Finish information is missing for container " + containerId);
+      }
+      LOG.info("Completed reading history information of container "
+          + containerId);
+      return historyData;
+    } catch (IOException e) {
+      LOG.error("Error when reading history file of container " + containerId);
+      throw e;
+    } finally {
+      hfReader.close();
+    }
+  }
+
+  @Override
+  public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId)
+      throws IOException {
+    ApplicationAttemptHistoryData attemptHistoryData =
+        getApplicationAttempt(appAttemptId);
+    if (attemptHistoryData == null
+        || attemptHistoryData.getMasterContainerId() == null) {
+      return null;
+    }
+    return getContainer(attemptHistoryData.getMasterContainerId());
+  }
+
+  @Override
+  public Map<ContainerId, ContainerHistoryData> getContainers(
+      ApplicationAttemptId appAttemptId) throws IOException {
+    Map<ContainerId, ContainerHistoryData> historyDataMap =
+        new HashMap<ContainerId, ContainerHistoryData>();
+    Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap =
+        new HashMap<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>>();
+    HistoryFileReader hfReader =
+        getHistoryFileReader(appAttemptId.getApplicationId());
+    try {
+      while (hfReader.hasNext()) {
+        HistoryFileReader.Entry entry = hfReader.next();
+        if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+          if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
+            retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
+                true);
+          } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
+            retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
+                false);
+          }
+        }
+      }
+      LOG.info("Completed reading history information of all conatiners"
+          + " of application attempt " + appAttemptId);
+    } catch (IOException e) {
+      LOG.info("Error when reading history information of some containers"
+          + " of application attempt " + appAttemptId);
+    } finally {
+      hfReader.close();
+    }
+    for (Map.Entry<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> entry : startFinshDataMap
+        .entrySet()) {
+      ContainerHistoryData historyData =
+          ContainerHistoryData.newInstance(entry.getKey(), null, null, null,
+              Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE,
+              null);
+      mergeContainerHistoryData(historyData, entry.getValue().startData);
+      mergeContainerHistoryData(historyData, entry.getValue().finishData);
+      historyDataMap.put(entry.getKey(), historyData);
+    }
+    return historyDataMap;
+  }
+
+  private
+      void
+      retrieveStartFinishData(
+          ApplicationAttemptId appAttemptId,
+          HistoryFileReader.Entry entry,
+          Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap,
+          boolean start) throws IOException {
+    ContainerId containerId =
+        ConverterUtils.toContainerId(entry.key.id);
+    if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
+      StartFinishDataPair<ContainerStartData, ContainerFinishData> pair =
+          startFinshDataMap.get(containerId);
+      if (pair == null) {
+        pair =
+            new StartFinishDataPair<ContainerStartData, ContainerFinishData>();
+        startFinshDataMap.put(containerId, pair);
+      }
+      if (start) {
+        pair.startData = parseContainerStartData(entry.value);
+      } else {
+        pair.finishData = parseContainerFinishData(entry.value);
+      }
+    }
+  }
+
+  @Override
+  public void applicationStarted(ApplicationStartData appStart)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        outstandingWriters.get(appStart.getApplicationId());
+    if (hfWriter == null) {
+      Path applicationHistoryFile =
+          new Path(rootDirPath, appStart.getApplicationId().toString());
+      try {
+        hfWriter = new HistoryFileWriter(applicationHistoryFile);
+        LOG.info("Opened history file of application "
+            + appStart.getApplicationId());
+      } catch (IOException e) {
+        LOG.error("Error when openning history file of application "
+            + appStart.getApplicationId());
+        throw e;
+      }
+      outstandingWriters.put(appStart.getApplicationId(), hfWriter);
+    } else {
+      throw new IOException("History file of application "
+          + appStart.getApplicationId() + " is already opened");
+    }
+    assert appStart instanceof ApplicationStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId()
+          .toString(), START_DATA_SUFFIX),
+          ((ApplicationStartDataPBImpl) appStart)
+              .getProto().toByteArray());
+      LOG.info("Start information of application "
+          + appStart.getApplicationId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of application "
+          + appStart.getApplicationId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void applicationFinished(ApplicationFinishData appFinish)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appFinish.getApplicationId());
+    assert appFinish instanceof ApplicationFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(
+          new HistoryDataKey(appFinish.getApplicationId().toString(),
+              FINISH_DATA_SUFFIX),
+          ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
+      LOG.info("Finish information of application "
+          + appFinish.getApplicationId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of application "
+          + appFinish.getApplicationId());
+      throw e;
+    } finally {
+      hfWriter.close();
+      outstandingWriters.remove(appFinish.getApplicationId());
+    }
+  }
+
+  @Override
+  public void applicationAttemptStarted(
+      ApplicationAttemptStartData appAttemptStart) throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
+            .getApplicationId());
+    assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(
+          new HistoryDataKey(appAttemptStart.getApplicationAttemptId()
+              .toString(),
+              START_DATA_SUFFIX),
+          ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
+              .toByteArray());
+      LOG.info("Start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of application attempt "
+          + appAttemptStart.getApplicationAttemptId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void applicationAttemptFinished(
+      ApplicationAttemptFinishData appAttemptFinish) throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
+            .getApplicationId());
+    assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(
+          new HistoryDataKey(appAttemptFinish.getApplicationAttemptId()
+              .toString(),
+              FINISH_DATA_SUFFIX),
+          ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
+              .toByteArray());
+      LOG.info("Finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of application attempt "
+          + appAttemptFinish.getApplicationAttemptId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void containerStarted(ContainerStartData containerStart)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(containerStart.getContainerId()
+            .getApplicationAttemptId()
+            .getApplicationId());
+    assert containerStart instanceof ContainerStartDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(
+          new HistoryDataKey(containerStart.getContainerId().toString(),
+              START_DATA_SUFFIX),
+          ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
+      LOG.info("Start information of container "
+          + containerStart.getContainerId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing start information of container "
+          + containerStart.getContainerId());
+      throw e;
+    }
+  }
+
+  @Override
+  public void containerFinished(ContainerFinishData containerFinish)
+      throws IOException {
+    HistoryFileWriter hfWriter =
+        getHistoryFileWriter(containerFinish.getContainerId()
+            .getApplicationAttemptId().getApplicationId());
+    assert containerFinish instanceof ContainerFinishDataPBImpl;
+    try {
+      hfWriter.writeHistoryData(
+          new HistoryDataKey(containerFinish.getContainerId().toString(),
+              FINISH_DATA_SUFFIX),
+          ((ContainerFinishDataPBImpl) containerFinish).getProto()
+              .toByteArray());
+      LOG.info("Finish information of container "
+          + containerFinish.getContainerId() + " is written");
+    } catch (IOException e) {
+      LOG.error("Error when writing finish information of container "
+          + containerFinish.getContainerId());
+    }
+  }
+
+  private static ApplicationStartData parseApplicationStartData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ApplicationStartDataPBImpl(
+        ApplicationStartDataProto.parseFrom(value));
+  }
+
+  private static ApplicationFinishData parseApplicationFinishData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ApplicationFinishDataPBImpl(
+        ApplicationFinishDataProto.parseFrom(value));
+  }
+
+  private static ApplicationAttemptStartData parseApplicationAttemptStartData(
+      byte[] value) throws InvalidProtocolBufferException {
+    return new ApplicationAttemptStartDataPBImpl(
+        ApplicationAttemptStartDataProto.parseFrom(value));
+  }
+
+  private static ApplicationAttemptFinishData
+      parseApplicationAttemptFinishData(
+          byte[] value) throws InvalidProtocolBufferException {
+    return new ApplicationAttemptFinishDataPBImpl(
+        ApplicationAttemptFinishDataProto.parseFrom(value));
+  }
+
+  private static ContainerStartData parseContainerStartData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ContainerStartDataPBImpl(
+        ContainerStartDataProto.parseFrom(value));
+  }
+
+  private static ContainerFinishData parseContainerFinishData(byte[] value)
+      throws InvalidProtocolBufferException {
+    return new ContainerFinishDataPBImpl(
+        ContainerFinishDataProto.parseFrom(value));
+  }
+
+  private static void mergeApplicationHistoryData(
+      ApplicationHistoryData historyData,
+      ApplicationStartData startData) {
+    historyData.setApplicationName(startData.getApplicationName());
+    historyData.setApplicationType(startData.getApplicationType());
+    historyData.setQueue(startData.getQueue());
+    historyData.setUser(startData.getUser());
+    historyData.setSubmitTime(startData.getSubmitTime());
+    historyData.setStartTime(startData.getStartTime());
+  }
+
+  private static void mergeApplicationHistoryData(
+      ApplicationHistoryData historyData,
+      ApplicationFinishData finishData) {
+    historyData.setFinishTime(finishData.getFinishTime());
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setFinalApplicationStatus(finishData
+        .getFinalApplicationStatus());
+    historyData.setYarnApplicationState(finishData.getYarnApplicationState());
+  }
+
+  private static void mergeApplicationAttemptHistoryData(
+      ApplicationAttemptHistoryData historyData,
+      ApplicationAttemptStartData startData) {
+    historyData.setHost(startData.getHost());
+    historyData.setRPCPort(startData.getRPCPort());
+    historyData.setMasterContainerId(startData.getMasterContainerId());
+  }
+
+  private static void mergeApplicationAttemptHistoryData(
+      ApplicationAttemptHistoryData historyData,
+      ApplicationAttemptFinishData finishData) {
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setTrackingURL(finishData.getTrackingURL());
+    historyData.setFinalApplicationStatus(finishData
+        .getFinalApplicationStatus());
+    historyData.setYarnApplicationAttemptState(finishData
+        .getYarnApplicationAttemptState());
+  }
+
+  private static void mergeContainerHistoryData(
+      ContainerHistoryData historyData, ContainerStartData startData) {
+    historyData.setAllocatedResource(startData.getAllocatedResource());
+    historyData.setAssignedNode(startData.getAssignedNode());
+    historyData.setPriority(startData.getPriority());
+    historyData.setStartTime(startData.getStartTime());
+  }
+
+  private static void mergeContainerHistoryData(
+      ContainerHistoryData historyData, ContainerFinishData finishData) {
+    historyData.setFinishTime(finishData.getFinishTime());
+    historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
+    historyData.setLogURL(finishData.getLogURL());
+    historyData.setContainerExitStatus(finishData
+        .getContainerExitStatus());
+    historyData.setContainerState(finishData.getContainerState());
+  }
+
+  private HistoryFileWriter getHistoryFileWriter(ApplicationId appId)
+      throws IOException {
+    HistoryFileWriter hfWriter = outstandingWriters.get(appId);
+    if (hfWriter == null) {
+      throw new IOException("History file of application " + appId
+          + " is not opened");
+    }
+    return hfWriter;
+  }
+
+  private HistoryFileReader getHistoryFileReader(ApplicationId appId)
+      throws IOException {
+    Path applicationHistoryFile = new Path(rootDirPath, appId.toString());
+    if (!fs.exists(applicationHistoryFile)) {
+      throw new IOException("History file for application " + appId
+          + " is not found");
+    }
+    // The history file is still under writing
+    if (outstandingWriters.containsKey(appId)) {
+      throw new IOException("History file for application " + appId
+          + " is under writing");
+    }
+    return new HistoryFileReader(applicationHistoryFile);
+  }
+
+  private class HistoryFileReader {
+
+    private class Entry {
+
+      private HistoryDataKey key;
+      private byte[] value;
+
+      public Entry(HistoryDataKey key, byte[] value) {
+        this.key = key;
+        this.value = value;
+      }
+    }
+
+    private FSDataInputStream fsdis;
+    private TFile.Reader reader;
+    private TFile.Reader.Scanner scanner;
+
+    public HistoryFileReader(Path historyFile) throws IOException {
+      FSDataInputStream fsdis = fs.open(historyFile);
+      reader =
+          new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
+              getConfig());
+      reset();
+    }
+
+    public boolean hasNext() {
+      return !scanner.atEnd();
+    }
+
+    public Entry next() throws IOException {
+      TFile.Reader.Scanner.Entry entry = scanner.entry();
+      DataInputStream dis = entry.getKeyStream();
+      HistoryDataKey key = new HistoryDataKey();
+      key.readFields(dis);
+      dis = entry.getValueStream();
+      byte[] value = new byte[entry.getValueLength()];
+      dis.read(value);
+      scanner.advance();
+      return new Entry(key, value);
+    }
+
+    public void reset() throws IOException {
+      IOUtils.cleanup(LOG, scanner);
+      scanner = reader.createScanner();
+    }
+
+    public void close() {
+      IOUtils.cleanup(LOG, scanner, reader, fsdis);
+    }
+
+  }
+
+  private class HistoryFileWriter {
+
+    private FSDataOutputStream fsdos;
+    private TFile.Writer writer;
+
+    public HistoryFileWriter(Path historyFile)
+        throws IOException {
+      if (fs.exists(historyFile)) {
+        fsdos = fs.append(historyFile);
+      } else {
+        fsdos = fs.create(historyFile);
+      }
+      fs.setPermission(historyFile, HISTORY_FILE_UMASK);
+      writer =
+          new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
+              YarnConfiguration.FS_HISTORY_STORE_COMPRESSION_TYPE,
+              YarnConfiguration.DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE),
+              null, getConfig());
+    }
+
+    public synchronized void close() {
+      IOUtils.cleanup(LOG, writer, fsdos);
+    }
+
+    public synchronized void writeHistoryData(HistoryDataKey key, byte[] value)
+        throws IOException {
+      DataOutputStream dos = null;
+      try {
+        dos = writer.prepareAppendKey(-1);
+        key.write(dos);
+      } finally {
+        IOUtils.cleanup(LOG, dos);
+      }
+      try {
+        dos = writer.prepareAppendValue(value.length);
+        dos.write(value);
+      } finally {
+        IOUtils.cleanup(LOG, dos);
+      }
+    }
+
+  }
+
+  private static class HistoryDataKey implements Writable {
+
+    private String id;
+
+    private String suffix;
+
+    public HistoryDataKey() {
+      this(null, null);
+    }
+
+    public HistoryDataKey(String id, String suffix) {
+      this.id = id;
+      this.suffix = suffix;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(id);
+      out.writeUTF(suffix);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      id = in.readUTF();
+      suffix = in.readUTF();
+    }
+
+  }
+
+  private static class StartFinishDataPair<S, F> {
+
+    private S startData;
+    private F finishData;
+
+  }
+
+}

+ 198 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestFileSystemApplicationHistoryStore.java

@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFileSystemApplicationHistoryStore extends
+    ApplicationHistoryStoreTestUtils {
+
+  private FileSystem fs;
+  private Path fsWorkingPath;
+
+  @Before
+  public void setup() throws Exception {
+    fs = new RawLocalFileSystem();
+    Configuration conf = new Configuration();
+    fs.initialize(new URI("/"), conf);
+    fsWorkingPath = new Path("Test");
+    fs.delete(fsWorkingPath, true);
+    conf.set(YarnConfiguration.FS_HISTORY_STORE_URI, fsWorkingPath.toString());
+    store = new FileSystemApplicationHistoryStore();
+    store.init(conf);
+    store.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.stop();
+    fs.delete(fsWorkingPath, true);
+    fs.close();
+  }
+
+  @Test
+  public void testReadWriteHistoryData() throws IOException {
+    testWriteHistoryData(5);
+    testReadHistoryData(5);
+  }
+
+  private void testWriteHistoryData(int num) throws IOException {
+    // write application history data
+    for (int i = 1; i <= num; ++i) {
+      ApplicationId appId = ApplicationId.newInstance(0, i);
+      writeApplicationStartData(appId);
+
+      // write application attempt history data
+      for (int j = 1; j <= num; ++j) {
+        ApplicationAttemptId appAttemptId =
+            ApplicationAttemptId.newInstance(appId, j);
+        writeApplicationAttemptStartData(appAttemptId);
+
+        // write container history data
+        for (int k = 1; k <= num; ++k) {
+          ContainerId containerId = ContainerId.newInstance(appAttemptId, k);
+          writeContainerStartData(containerId);
+          writeContainerFinishData(containerId);
+
+          writeApplicationAttemptFinishData(appAttemptId);
+        }
+      }
+
+      writeApplicationFinishData(appId);
+    }
+  }
+
+  private void testReadHistoryData(int num) throws IOException {
+    // read application history data
+    Assert.assertEquals(num, store.getAllApplications().size());
+    for (int i = 1; i <= num; ++i) {
+      ApplicationId appId = ApplicationId.newInstance(0, i);
+      ApplicationHistoryData appData = store.getApplication(appId);
+      Assert.assertNotNull(appData);
+      Assert.assertEquals(appId.toString(), appData.getApplicationName());
+      Assert.assertEquals(appId.toString(), appData.getDiagnosticsInfo());
+
+      // read application attempt history data
+      Assert.assertEquals(
+          num, store.getApplicationAttempts(appId).size());
+      for (int j = 1; j <= num; ++j) {
+        ApplicationAttemptId appAttemptId =
+            ApplicationAttemptId.newInstance(appId, j);
+        ApplicationAttemptHistoryData attemptData =
+            store.getApplicationAttempt(appAttemptId);
+        Assert.assertNotNull(attemptData);
+        Assert.assertEquals(appAttemptId.toString(), attemptData.getHost());
+        Assert.assertEquals(appAttemptId.toString(),
+            attemptData.getDiagnosticsInfo());
+
+        // read container history data
+        Assert.assertEquals(
+            num, store.getContainers(appAttemptId).size());
+        for (int k = 1; k <= num; ++k) {
+          ContainerId containerId = ContainerId.newInstance(appAttemptId, k);
+          ContainerHistoryData containerData = store.getContainer(containerId);
+          Assert.assertNotNull(containerData);
+          Assert.assertEquals(Priority.newInstance(containerId.getId()),
+              containerData.getPriority());
+          Assert.assertEquals(containerId.toString(),
+              containerData.getDiagnosticsInfo());
+        }
+        ContainerHistoryData masterContainer =
+            store.getAMContainer(appAttemptId);
+        Assert.assertNotNull(masterContainer);
+        Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1),
+            masterContainer.getContainerId());
+      }
+    }
+  }
+
+  @Test
+  public void testWriteAfterApplicationFinish() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    writeApplicationStartData(appId);
+    writeApplicationFinishData(appId);
+    // write application attempt history data
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    try {
+      writeApplicationAttemptStartData(appAttemptId);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("is not opened"));
+    }
+    try {
+      writeApplicationAttemptFinishData(appAttemptId);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("is not opened"));
+    }
+    // write container history data
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+    try {
+      writeContainerStartData(containerId);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("is not opened"));
+    }
+    try {
+      writeContainerFinishData(containerId);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("is not opened"));
+    }
+  }
+
+  @Test
+  public void testMassiveWriteContainerHistoryData() throws IOException {
+    long mb = 1024 * 1024;
+    long usedDiskBefore = fs.getContentSummary(fsWorkingPath).getLength() / mb;
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    writeApplicationStartData(appId);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    for (int i = 1; i <= 100000; ++i) {
+      ContainerId containerId = ContainerId.newInstance(appAttemptId, i);
+      writeContainerStartData(containerId);
+      writeContainerFinishData(containerId);
+    }
+    writeApplicationFinishData(appId);
+    long usedDiskAfter = fs.getContentSummary(fsWorkingPath).getLength() / mb;
+    Assert.assertTrue((usedDiskAfter - usedDiskBefore) < 20);
+  }
+
+}