|
@@ -89,16 +89,16 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
implements ApplicationHistoryStore {
|
|
implements ApplicationHistoryStore {
|
|
|
|
|
|
private static final Log LOG = LogFactory
|
|
private static final Log LOG = LogFactory
|
|
- .getLog(FileSystemApplicationHistoryStore.class);
|
|
|
|
|
|
+ .getLog(FileSystemApplicationHistoryStore.class);
|
|
|
|
|
|
private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
|
|
private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot";
|
|
private static final int MIN_BLOCK_SIZE = 256 * 1024;
|
|
private static final int MIN_BLOCK_SIZE = 256 * 1024;
|
|
private static final String START_DATA_SUFFIX = "_start";
|
|
private static final String START_DATA_SUFFIX = "_start";
|
|
private static final String FINISH_DATA_SUFFIX = "_finish";
|
|
private static final String FINISH_DATA_SUFFIX = "_finish";
|
|
- private static final FsPermission ROOT_DIR_UMASK =
|
|
|
|
- FsPermission.createImmutable((short) 0740);
|
|
|
|
- private static final FsPermission HISTORY_FILE_UMASK =
|
|
|
|
- FsPermission.createImmutable((short) 0640);
|
|
|
|
|
|
+ private static final FsPermission ROOT_DIR_UMASK = FsPermission
|
|
|
|
+ .createImmutable((short) 0740);
|
|
|
|
+ private static final FsPermission HISTORY_FILE_UMASK = FsPermission
|
|
|
|
+ .createImmutable((short) 0640);
|
|
|
|
|
|
private FileSystem fs;
|
|
private FileSystem fs;
|
|
private Path rootDirPath;
|
|
private Path rootDirPath;
|
|
@@ -112,8 +112,8 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void serviceInit(Configuration conf) throws Exception {
|
|
public void serviceInit(Configuration conf) throws Exception {
|
|
- Path fsWorkingPath = new Path(
|
|
|
|
- conf.get(YarnConfiguration.FS_HISTORY_STORE_URI));
|
|
|
|
|
|
+ Path fsWorkingPath =
|
|
|
|
+ new Path(conf.get(YarnConfiguration.FS_HISTORY_STORE_URI));
|
|
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
|
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
|
try {
|
|
try {
|
|
fs = fsWorkingPath.getFileSystem(conf);
|
|
fs = fsWorkingPath.getFileSystem(conf);
|
|
@@ -130,7 +130,7 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
public void serviceStop() throws Exception {
|
|
public void serviceStop() throws Exception {
|
|
try {
|
|
try {
|
|
for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
|
|
for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters
|
|
- .entrySet()) {
|
|
|
|
|
|
+ .entrySet()) {
|
|
entry.getValue().close();
|
|
entry.getValue().close();
|
|
}
|
|
}
|
|
outstandingWriters.clear();
|
|
outstandingWriters.clear();
|
|
@@ -148,9 +148,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
boolean readStartData = false;
|
|
boolean readStartData = false;
|
|
boolean readFinishData = false;
|
|
boolean readFinishData = false;
|
|
ApplicationHistoryData historyData =
|
|
ApplicationHistoryData historyData =
|
|
- ApplicationHistoryData.newInstance(
|
|
|
|
- appId, null, null, null, null, Long.MIN_VALUE, Long.MIN_VALUE,
|
|
|
|
- Long.MAX_VALUE, null, FinalApplicationStatus.UNDEFINED, null);
|
|
|
|
|
|
+ ApplicationHistoryData.newInstance(appId, null, null, null, null,
|
|
|
|
+ Long.MIN_VALUE, Long.MIN_VALUE, Long.MAX_VALUE, null,
|
|
|
|
+ FinalApplicationStatus.UNDEFINED, null);
|
|
while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
|
|
while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
|
|
HistoryFileReader.Entry entry = hfReader.next();
|
|
HistoryFileReader.Entry entry = hfReader.next();
|
|
if (entry.key.id.equals(appId.toString())) {
|
|
if (entry.key.id.equals(appId.toString())) {
|
|
@@ -238,15 +238,14 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
hfReader.close();
|
|
hfReader.close();
|
|
}
|
|
}
|
|
for (Map.Entry<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> entry : startFinshDataMap
|
|
for (Map.Entry<ApplicationAttemptId, StartFinishDataPair<ApplicationAttemptStartData, ApplicationAttemptFinishData>> entry : startFinshDataMap
|
|
- .entrySet()) {
|
|
|
|
|
|
+ .entrySet()) {
|
|
ApplicationAttemptHistoryData historyData =
|
|
ApplicationAttemptHistoryData historyData =
|
|
- ApplicationAttemptHistoryData.newInstance(
|
|
|
|
- entry.getKey(), null, -1, null, null, null,
|
|
|
|
- FinalApplicationStatus.UNDEFINED, null);
|
|
|
|
|
|
+ ApplicationAttemptHistoryData.newInstance(entry.getKey(), null, -1,
|
|
|
|
+ null, null, null, FinalApplicationStatus.UNDEFINED, null);
|
|
mergeApplicationAttemptHistoryData(historyData,
|
|
mergeApplicationAttemptHistoryData(historyData,
|
|
- entry.getValue().startData);
|
|
|
|
|
|
+ entry.getValue().startData);
|
|
mergeApplicationAttemptHistoryData(historyData,
|
|
mergeApplicationAttemptHistoryData(historyData,
|
|
- entry.getValue().finishData);
|
|
|
|
|
|
+ entry.getValue().finishData);
|
|
historyDataMap.put(entry.getKey(), historyData);
|
|
historyDataMap.put(entry.getKey(), historyData);
|
|
}
|
|
}
|
|
return historyDataMap;
|
|
return historyDataMap;
|
|
@@ -286,9 +285,8 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
boolean readStartData = false;
|
|
boolean readStartData = false;
|
|
boolean readFinishData = false;
|
|
boolean readFinishData = false;
|
|
ApplicationAttemptHistoryData historyData =
|
|
ApplicationAttemptHistoryData historyData =
|
|
- ApplicationAttemptHistoryData.newInstance(
|
|
|
|
- appAttemptId, null, -1, null, null, null,
|
|
|
|
- FinalApplicationStatus.UNDEFINED, null);
|
|
|
|
|
|
+ ApplicationAttemptHistoryData.newInstance(appAttemptId, null, -1,
|
|
|
|
+ null, null, null, FinalApplicationStatus.UNDEFINED, null);
|
|
while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
|
|
while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
|
|
HistoryFileReader.Entry entry = hfReader.next();
|
|
HistoryFileReader.Entry entry = hfReader.next();
|
|
if (entry.key.id.equals(appAttemptId.toString())) {
|
|
if (entry.key.id.equals(appAttemptId.toString())) {
|
|
@@ -333,20 +331,19 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
throws IOException {
|
|
throws IOException {
|
|
HistoryFileReader hfReader =
|
|
HistoryFileReader hfReader =
|
|
getHistoryFileReader(containerId.getApplicationAttemptId()
|
|
getHistoryFileReader(containerId.getApplicationAttemptId()
|
|
- .getApplicationId());
|
|
|
|
|
|
+ .getApplicationId());
|
|
try {
|
|
try {
|
|
boolean readStartData = false;
|
|
boolean readStartData = false;
|
|
boolean readFinishData = false;
|
|
boolean readFinishData = false;
|
|
ContainerHistoryData historyData =
|
|
ContainerHistoryData historyData =
|
|
- ContainerHistoryData.newInstance(containerId, null, null, null,
|
|
|
|
- Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE,
|
|
|
|
- null);
|
|
|
|
|
|
+ ContainerHistoryData
|
|
|
|
+ .newInstance(containerId, null, null, null, Long.MIN_VALUE,
|
|
|
|
+ Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null);
|
|
while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
|
|
while ((!readStartData || !readFinishData) && hfReader.hasNext()) {
|
|
HistoryFileReader.Entry entry = hfReader.next();
|
|
HistoryFileReader.Entry entry = hfReader.next();
|
|
if (entry.key.id.equals(containerId.toString())) {
|
|
if (entry.key.id.equals(containerId.toString())) {
|
|
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
|
|
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
|
|
- ContainerStartData startData =
|
|
|
|
- parseContainerStartData(entry.value);
|
|
|
|
|
|
+ ContainerStartData startData = parseContainerStartData(entry.value);
|
|
mergeContainerHistoryData(historyData, startData);
|
|
mergeContainerHistoryData(historyData, startData);
|
|
readStartData = true;
|
|
readStartData = true;
|
|
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
|
|
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
|
|
@@ -404,10 +401,10 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
|
|
if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
|
|
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
|
|
if (entry.key.suffix.equals(START_DATA_SUFFIX)) {
|
|
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
|
|
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
|
|
- true);
|
|
|
|
|
|
+ true);
|
|
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
|
|
} else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) {
|
|
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
|
|
retrieveStartFinishData(appAttemptId, entry, startFinshDataMap,
|
|
- false);
|
|
|
|
|
|
+ false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -420,11 +417,11 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
hfReader.close();
|
|
hfReader.close();
|
|
}
|
|
}
|
|
for (Map.Entry<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> entry : startFinshDataMap
|
|
for (Map.Entry<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> entry : startFinshDataMap
|
|
- .entrySet()) {
|
|
|
|
|
|
+ .entrySet()) {
|
|
ContainerHistoryData historyData =
|
|
ContainerHistoryData historyData =
|
|
- ContainerHistoryData.newInstance(entry.getKey(), null, null, null,
|
|
|
|
- Long.MIN_VALUE, Long.MAX_VALUE, null, null, Integer.MAX_VALUE,
|
|
|
|
- null);
|
|
|
|
|
|
+ ContainerHistoryData
|
|
|
|
+ .newInstance(entry.getKey(), null, null, null, Long.MIN_VALUE,
|
|
|
|
+ Long.MAX_VALUE, null, null, Integer.MAX_VALUE, null);
|
|
mergeContainerHistoryData(historyData, entry.getValue().startData);
|
|
mergeContainerHistoryData(historyData, entry.getValue().startData);
|
|
mergeContainerHistoryData(historyData, entry.getValue().finishData);
|
|
mergeContainerHistoryData(historyData, entry.getValue().finishData);
|
|
historyDataMap.put(entry.getKey(), historyData);
|
|
historyDataMap.put(entry.getKey(), historyData);
|
|
@@ -439,8 +436,7 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
HistoryFileReader.Entry entry,
|
|
HistoryFileReader.Entry entry,
|
|
Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap,
|
|
Map<ContainerId, StartFinishDataPair<ContainerStartData, ContainerFinishData>> startFinshDataMap,
|
|
boolean start) throws IOException {
|
|
boolean start) throws IOException {
|
|
- ContainerId containerId =
|
|
|
|
- ConverterUtils.toContainerId(entry.key.id);
|
|
|
|
|
|
+ ContainerId containerId = ConverterUtils.toContainerId(entry.key.id);
|
|
if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
|
|
if (containerId.getApplicationAttemptId().equals(appAttemptId)) {
|
|
StartFinishDataPair<ContainerStartData, ContainerFinishData> pair =
|
|
StartFinishDataPair<ContainerStartData, ContainerFinishData> pair =
|
|
startFinshDataMap.get(containerId);
|
|
startFinshDataMap.get(containerId);
|
|
@@ -482,9 +478,8 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
assert appStart instanceof ApplicationStartDataPBImpl;
|
|
assert appStart instanceof ApplicationStartDataPBImpl;
|
|
try {
|
|
try {
|
|
hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId()
|
|
hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId()
|
|
- .toString(), START_DATA_SUFFIX),
|
|
|
|
- ((ApplicationStartDataPBImpl) appStart)
|
|
|
|
- .getProto().toByteArray());
|
|
|
|
|
|
+ .toString(), START_DATA_SUFFIX),
|
|
|
|
+ ((ApplicationStartDataPBImpl) appStart).getProto().toByteArray());
|
|
LOG.info("Start information of application "
|
|
LOG.info("Start information of application "
|
|
+ appStart.getApplicationId() + " is written");
|
|
+ appStart.getApplicationId() + " is written");
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -501,10 +496,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
getHistoryFileWriter(appFinish.getApplicationId());
|
|
getHistoryFileWriter(appFinish.getApplicationId());
|
|
assert appFinish instanceof ApplicationFinishDataPBImpl;
|
|
assert appFinish instanceof ApplicationFinishDataPBImpl;
|
|
try {
|
|
try {
|
|
- hfWriter.writeHistoryData(
|
|
|
|
- new HistoryDataKey(appFinish.getApplicationId().toString(),
|
|
|
|
- FINISH_DATA_SUFFIX),
|
|
|
|
- ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
|
|
|
|
|
|
+ hfWriter.writeHistoryData(new HistoryDataKey(appFinish.getApplicationId()
|
|
|
|
+ .toString(), FINISH_DATA_SUFFIX),
|
|
|
|
+ ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray());
|
|
LOG.info("Finish information of application "
|
|
LOG.info("Finish information of application "
|
|
+ appFinish.getApplicationId() + " is written");
|
|
+ appFinish.getApplicationId() + " is written");
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -522,15 +516,13 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
ApplicationAttemptStartData appAttemptStart) throws IOException {
|
|
ApplicationAttemptStartData appAttemptStart) throws IOException {
|
|
HistoryFileWriter hfWriter =
|
|
HistoryFileWriter hfWriter =
|
|
getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
|
|
getHistoryFileWriter(appAttemptStart.getApplicationAttemptId()
|
|
- .getApplicationId());
|
|
|
|
|
|
+ .getApplicationId());
|
|
assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
|
|
assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl;
|
|
try {
|
|
try {
|
|
- hfWriter.writeHistoryData(
|
|
|
|
- new HistoryDataKey(appAttemptStart.getApplicationAttemptId()
|
|
|
|
- .toString(),
|
|
|
|
- START_DATA_SUFFIX),
|
|
|
|
- ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
|
|
|
|
- .toByteArray());
|
|
|
|
|
|
+ hfWriter.writeHistoryData(new HistoryDataKey(appAttemptStart
|
|
|
|
+ .getApplicationAttemptId().toString(), START_DATA_SUFFIX),
|
|
|
|
+ ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto()
|
|
|
|
+ .toByteArray());
|
|
LOG.info("Start information of application attempt "
|
|
LOG.info("Start information of application attempt "
|
|
+ appAttemptStart.getApplicationAttemptId() + " is written");
|
|
+ appAttemptStart.getApplicationAttemptId() + " is written");
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -545,15 +537,13 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
ApplicationAttemptFinishData appAttemptFinish) throws IOException {
|
|
ApplicationAttemptFinishData appAttemptFinish) throws IOException {
|
|
HistoryFileWriter hfWriter =
|
|
HistoryFileWriter hfWriter =
|
|
getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
|
|
getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId()
|
|
- .getApplicationId());
|
|
|
|
|
|
+ .getApplicationId());
|
|
assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
|
|
assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl;
|
|
try {
|
|
try {
|
|
- hfWriter.writeHistoryData(
|
|
|
|
- new HistoryDataKey(appAttemptFinish.getApplicationAttemptId()
|
|
|
|
- .toString(),
|
|
|
|
- FINISH_DATA_SUFFIX),
|
|
|
|
- ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
|
|
|
|
- .toByteArray());
|
|
|
|
|
|
+ hfWriter.writeHistoryData(new HistoryDataKey(appAttemptFinish
|
|
|
|
+ .getApplicationAttemptId().toString(), FINISH_DATA_SUFFIX),
|
|
|
|
+ ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto()
|
|
|
|
+ .toByteArray());
|
|
LOG.info("Finish information of application attempt "
|
|
LOG.info("Finish information of application attempt "
|
|
+ appAttemptFinish.getApplicationAttemptId() + " is written");
|
|
+ appAttemptFinish.getApplicationAttemptId() + " is written");
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -568,14 +558,12 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
throws IOException {
|
|
throws IOException {
|
|
HistoryFileWriter hfWriter =
|
|
HistoryFileWriter hfWriter =
|
|
getHistoryFileWriter(containerStart.getContainerId()
|
|
getHistoryFileWriter(containerStart.getContainerId()
|
|
- .getApplicationAttemptId()
|
|
|
|
- .getApplicationId());
|
|
|
|
|
|
+ .getApplicationAttemptId().getApplicationId());
|
|
assert containerStart instanceof ContainerStartDataPBImpl;
|
|
assert containerStart instanceof ContainerStartDataPBImpl;
|
|
try {
|
|
try {
|
|
- hfWriter.writeHistoryData(
|
|
|
|
- new HistoryDataKey(containerStart.getContainerId().toString(),
|
|
|
|
- START_DATA_SUFFIX),
|
|
|
|
- ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
|
|
|
|
|
|
+ hfWriter.writeHistoryData(new HistoryDataKey(containerStart
|
|
|
|
+ .getContainerId().toString(), START_DATA_SUFFIX),
|
|
|
|
+ ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray());
|
|
LOG.info("Start information of container "
|
|
LOG.info("Start information of container "
|
|
+ containerStart.getContainerId() + " is written");
|
|
+ containerStart.getContainerId() + " is written");
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -590,14 +578,12 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
throws IOException {
|
|
throws IOException {
|
|
HistoryFileWriter hfWriter =
|
|
HistoryFileWriter hfWriter =
|
|
getHistoryFileWriter(containerFinish.getContainerId()
|
|
getHistoryFileWriter(containerFinish.getContainerId()
|
|
- .getApplicationAttemptId().getApplicationId());
|
|
|
|
|
|
+ .getApplicationAttemptId().getApplicationId());
|
|
assert containerFinish instanceof ContainerFinishDataPBImpl;
|
|
assert containerFinish instanceof ContainerFinishDataPBImpl;
|
|
try {
|
|
try {
|
|
- hfWriter.writeHistoryData(
|
|
|
|
- new HistoryDataKey(containerFinish.getContainerId().toString(),
|
|
|
|
- FINISH_DATA_SUFFIX),
|
|
|
|
- ((ContainerFinishDataPBImpl) containerFinish).getProto()
|
|
|
|
- .toByteArray());
|
|
|
|
|
|
+ hfWriter.writeHistoryData(new HistoryDataKey(containerFinish
|
|
|
|
+ .getContainerId().toString(), FINISH_DATA_SUFFIX),
|
|
|
|
+ ((ContainerFinishDataPBImpl) containerFinish).getProto().toByteArray());
|
|
LOG.info("Finish information of container "
|
|
LOG.info("Finish information of container "
|
|
+ containerFinish.getContainerId() + " is written");
|
|
+ containerFinish.getContainerId() + " is written");
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
@@ -609,43 +595,42 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
private static ApplicationStartData parseApplicationStartData(byte[] value)
|
|
private static ApplicationStartData parseApplicationStartData(byte[] value)
|
|
throws InvalidProtocolBufferException {
|
|
throws InvalidProtocolBufferException {
|
|
return new ApplicationStartDataPBImpl(
|
|
return new ApplicationStartDataPBImpl(
|
|
- ApplicationStartDataProto.parseFrom(value));
|
|
|
|
|
|
+ ApplicationStartDataProto.parseFrom(value));
|
|
}
|
|
}
|
|
|
|
|
|
private static ApplicationFinishData parseApplicationFinishData(byte[] value)
|
|
private static ApplicationFinishData parseApplicationFinishData(byte[] value)
|
|
throws InvalidProtocolBufferException {
|
|
throws InvalidProtocolBufferException {
|
|
return new ApplicationFinishDataPBImpl(
|
|
return new ApplicationFinishDataPBImpl(
|
|
- ApplicationFinishDataProto.parseFrom(value));
|
|
|
|
|
|
+ ApplicationFinishDataProto.parseFrom(value));
|
|
}
|
|
}
|
|
|
|
|
|
private static ApplicationAttemptStartData parseApplicationAttemptStartData(
|
|
private static ApplicationAttemptStartData parseApplicationAttemptStartData(
|
|
byte[] value) throws InvalidProtocolBufferException {
|
|
byte[] value) throws InvalidProtocolBufferException {
|
|
return new ApplicationAttemptStartDataPBImpl(
|
|
return new ApplicationAttemptStartDataPBImpl(
|
|
- ApplicationAttemptStartDataProto.parseFrom(value));
|
|
|
|
|
|
+ ApplicationAttemptStartDataProto.parseFrom(value));
|
|
}
|
|
}
|
|
|
|
|
|
private static ApplicationAttemptFinishData
|
|
private static ApplicationAttemptFinishData
|
|
- parseApplicationAttemptFinishData(
|
|
|
|
- byte[] value) throws InvalidProtocolBufferException {
|
|
|
|
|
|
+ parseApplicationAttemptFinishData(byte[] value)
|
|
|
|
+ throws InvalidProtocolBufferException {
|
|
return new ApplicationAttemptFinishDataPBImpl(
|
|
return new ApplicationAttemptFinishDataPBImpl(
|
|
- ApplicationAttemptFinishDataProto.parseFrom(value));
|
|
|
|
|
|
+ ApplicationAttemptFinishDataProto.parseFrom(value));
|
|
}
|
|
}
|
|
|
|
|
|
private static ContainerStartData parseContainerStartData(byte[] value)
|
|
private static ContainerStartData parseContainerStartData(byte[] value)
|
|
throws InvalidProtocolBufferException {
|
|
throws InvalidProtocolBufferException {
|
|
return new ContainerStartDataPBImpl(
|
|
return new ContainerStartDataPBImpl(
|
|
- ContainerStartDataProto.parseFrom(value));
|
|
|
|
|
|
+ ContainerStartDataProto.parseFrom(value));
|
|
}
|
|
}
|
|
|
|
|
|
private static ContainerFinishData parseContainerFinishData(byte[] value)
|
|
private static ContainerFinishData parseContainerFinishData(byte[] value)
|
|
throws InvalidProtocolBufferException {
|
|
throws InvalidProtocolBufferException {
|
|
return new ContainerFinishDataPBImpl(
|
|
return new ContainerFinishDataPBImpl(
|
|
- ContainerFinishDataProto.parseFrom(value));
|
|
|
|
|
|
+ ContainerFinishDataProto.parseFrom(value));
|
|
}
|
|
}
|
|
|
|
|
|
private static void mergeApplicationHistoryData(
|
|
private static void mergeApplicationHistoryData(
|
|
- ApplicationHistoryData historyData,
|
|
|
|
- ApplicationStartData startData) {
|
|
|
|
|
|
+ ApplicationHistoryData historyData, ApplicationStartData startData) {
|
|
historyData.setApplicationName(startData.getApplicationName());
|
|
historyData.setApplicationName(startData.getApplicationName());
|
|
historyData.setApplicationType(startData.getApplicationType());
|
|
historyData.setApplicationType(startData.getApplicationType());
|
|
historyData.setQueue(startData.getQueue());
|
|
historyData.setQueue(startData.getQueue());
|
|
@@ -655,12 +640,11 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
}
|
|
}
|
|
|
|
|
|
private static void mergeApplicationHistoryData(
|
|
private static void mergeApplicationHistoryData(
|
|
- ApplicationHistoryData historyData,
|
|
|
|
- ApplicationFinishData finishData) {
|
|
|
|
|
|
+ ApplicationHistoryData historyData, ApplicationFinishData finishData) {
|
|
historyData.setFinishTime(finishData.getFinishTime());
|
|
historyData.setFinishTime(finishData.getFinishTime());
|
|
historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
|
|
historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
|
|
historyData.setFinalApplicationStatus(finishData
|
|
historyData.setFinalApplicationStatus(finishData
|
|
- .getFinalApplicationStatus());
|
|
|
|
|
|
+ .getFinalApplicationStatus());
|
|
historyData.setYarnApplicationState(finishData.getYarnApplicationState());
|
|
historyData.setYarnApplicationState(finishData.getYarnApplicationState());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -678,9 +662,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
|
|
historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
|
|
historyData.setTrackingURL(finishData.getTrackingURL());
|
|
historyData.setTrackingURL(finishData.getTrackingURL());
|
|
historyData.setFinalApplicationStatus(finishData
|
|
historyData.setFinalApplicationStatus(finishData
|
|
- .getFinalApplicationStatus());
|
|
|
|
|
|
+ .getFinalApplicationStatus());
|
|
historyData.setYarnApplicationAttemptState(finishData
|
|
historyData.setYarnApplicationAttemptState(finishData
|
|
- .getYarnApplicationAttemptState());
|
|
|
|
|
|
+ .getYarnApplicationAttemptState());
|
|
}
|
|
}
|
|
|
|
|
|
private static void mergeContainerHistoryData(
|
|
private static void mergeContainerHistoryData(
|
|
@@ -696,8 +680,7 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
historyData.setFinishTime(finishData.getFinishTime());
|
|
historyData.setFinishTime(finishData.getFinishTime());
|
|
historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
|
|
historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo());
|
|
historyData.setLogURL(finishData.getLogURL());
|
|
historyData.setLogURL(finishData.getLogURL());
|
|
- historyData.setContainerExitStatus(finishData
|
|
|
|
- .getContainerExitStatus());
|
|
|
|
|
|
+ historyData.setContainerExitStatus(finishData.getContainerExitStatus());
|
|
historyData.setContainerState(finishData.getContainerState());
|
|
historyData.setContainerState(finishData.getContainerState());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -746,7 +729,7 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
FSDataInputStream fsdis = fs.open(historyFile);
|
|
FSDataInputStream fsdis = fs.open(historyFile);
|
|
reader =
|
|
reader =
|
|
new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
|
|
new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
|
|
- getConfig());
|
|
|
|
|
|
+ getConfig());
|
|
reset();
|
|
reset();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -782,8 +765,7 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
private FSDataOutputStream fsdos;
|
|
private FSDataOutputStream fsdos;
|
|
private TFile.Writer writer;
|
|
private TFile.Writer writer;
|
|
|
|
|
|
- public HistoryFileWriter(Path historyFile)
|
|
|
|
- throws IOException {
|
|
|
|
|
|
+ public HistoryFileWriter(Path historyFile) throws IOException {
|
|
if (fs.exists(historyFile)) {
|
|
if (fs.exists(historyFile)) {
|
|
fsdos = fs.append(historyFile);
|
|
fsdos = fs.append(historyFile);
|
|
} else {
|
|
} else {
|
|
@@ -792,9 +774,9 @@ public class FileSystemApplicationHistoryStore extends AbstractService
|
|
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
|
|
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
|
|
writer =
|
|
writer =
|
|
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
|
|
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
|
|
- YarnConfiguration.FS_HISTORY_STORE_COMPRESSION_TYPE,
|
|
|
|
- YarnConfiguration.DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE),
|
|
|
|
- null, getConfig());
|
|
|
|
|
|
+ YarnConfiguration.FS_HISTORY_STORE_COMPRESSION_TYPE,
|
|
|
|
+ YarnConfiguration.DEFAULT_FS_HISTORY_STORE_COMPRESSION_TYPE), null,
|
|
|
|
+ getConfig());
|
|
}
|
|
}
|
|
|
|
|
|
public synchronized void close() {
|
|
public synchronized void close() {
|