|
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hbase.client.Connection;
|
|
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
|
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
|
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
|
@@ -63,6 +65,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -85,6 +91,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
private TypedBufferedMutator<ApplicationTable> applicationTable;
|
|
|
private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
|
|
|
private TypedBufferedMutator<FlowRunTable> flowRunTable;
|
|
|
+ private TypedBufferedMutator<SubApplicationTable> subApplicationTable;
|
|
|
|
|
|
/**
|
|
|
* Used to convert strings key components to and from storage format.
|
|
@@ -97,6 +104,10 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
*/
|
|
|
private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
|
|
|
|
|
|
+ private enum Tables {
|
|
|
+ APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE
|
|
|
+ };
|
|
|
+
|
|
|
public HBaseTimelineWriterImpl() {
|
|
|
super(HBaseTimelineWriterImpl.class.getName());
|
|
|
}
|
|
@@ -116,17 +127,28 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
|
|
|
flowActivityTable =
|
|
|
new FlowActivityTable().getTableMutator(hbaseConf, conn);
|
|
|
+ subApplicationTable =
|
|
|
+ new SubApplicationTable().getTableMutator(hbaseConf, conn);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Stores the entire information in TimelineEntities to the timeline store.
|
|
|
*/
|
|
|
@Override
|
|
|
- public TimelineWriteResponse write(String clusterId, String userId,
|
|
|
- String flowName, String flowVersion, long flowRunId, String appId,
|
|
|
- TimelineEntities data) throws IOException {
|
|
|
+ public TimelineWriteResponse write(TimelineCollectorContext context,
|
|
|
+ TimelineEntities data, UserGroupInformation callerUgi)
|
|
|
+ throws IOException {
|
|
|
|
|
|
TimelineWriteResponse putStatus = new TimelineWriteResponse();
|
|
|
+
|
|
|
+ String clusterId = context.getClusterId();
|
|
|
+ String userId = context.getUserId();
|
|
|
+ String flowName = context.getFlowName();
|
|
|
+ String flowVersion = context.getFlowVersion();
|
|
|
+ long flowRunId = context.getFlowRunId();
|
|
|
+ String appId = context.getAppId();
|
|
|
+ String subApplicationUser = callerUgi.getShortUserName();
|
|
|
+
|
|
|
// defensive coding to avoid NPE during row key construction
|
|
|
if ((flowName == null) || (appId == null) || (clusterId == null)
|
|
|
|| (userId == null)) {
|
|
@@ -152,18 +174,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
|
|
|
appId);
|
|
|
rowKey = applicationRowKey.getRowKey();
|
|
|
+ store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE);
|
|
|
} else {
|
|
|
EntityRowKey entityRowKey =
|
|
|
new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
|
|
|
te.getType(), te.getIdPrefix(), te.getId());
|
|
|
rowKey = entityRowKey.getRowKey();
|
|
|
+ store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
|
|
|
}
|
|
|
|
|
|
- storeInfo(rowKey, te, flowVersion, isApplication);
|
|
|
- storeEvents(rowKey, te.getEvents(), isApplication);
|
|
|
- storeConfig(rowKey, te.getConfigs(), isApplication);
|
|
|
- storeMetrics(rowKey, te.getMetrics(), isApplication);
|
|
|
- storeRelations(rowKey, te, isApplication);
|
|
|
+ if (!isApplication && !userId.equals(subApplicationUser)) {
|
|
|
+ SubApplicationRowKey subApplicationRowKey =
|
|
|
+ new SubApplicationRowKey(subApplicationUser, clusterId,
|
|
|
+ te.getType(), te.getIdPrefix(), te.getId(), userId);
|
|
|
+ rowKey = subApplicationRowKey.getRowKey();
|
|
|
+ store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE);
|
|
|
+ }
|
|
|
|
|
|
if (isApplication) {
|
|
|
TimelineEvent event =
|
|
@@ -304,72 +330,108 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void storeRelations(byte[] rowKey, TimelineEntity te,
|
|
|
- boolean isApplication) throws IOException {
|
|
|
- if (isApplication) {
|
|
|
- storeRelations(rowKey, te.getIsRelatedToEntities(),
|
|
|
- ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
|
|
|
- storeRelations(rowKey, te.getRelatesToEntities(),
|
|
|
- ApplicationColumnPrefix.RELATES_TO, applicationTable);
|
|
|
- } else {
|
|
|
- storeRelations(rowKey, te.getIsRelatedToEntities(),
|
|
|
- EntityColumnPrefix.IS_RELATED_TO, entityTable);
|
|
|
- storeRelations(rowKey, te.getRelatesToEntities(),
|
|
|
- EntityColumnPrefix.RELATES_TO, entityTable);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Stores the Relations from the {@linkplain TimelineEntity} object.
|
|
|
*/
|
|
|
private <T> void storeRelations(byte[] rowKey,
|
|
|
- Map<String, Set<String>> connectedEntities,
|
|
|
- ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
|
- throws IOException {
|
|
|
- for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
|
|
|
- .entrySet()) {
|
|
|
- // id3?id4?id5
|
|
|
- String compoundValue =
|
|
|
- Separator.VALUES.joinEncoded(connectedEntity.getValue());
|
|
|
- columnPrefix.store(rowKey, table,
|
|
|
- stringKeyConverter.encode(connectedEntity.getKey()), null,
|
|
|
- compoundValue);
|
|
|
+ Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
|
|
|
+ TypedBufferedMutator<T> table) throws IOException {
|
|
|
+ if (connectedEntities != null) {
|
|
|
+ for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
|
|
|
+ .entrySet()) {
|
|
|
+ // id3?id4?id5
|
|
|
+ String compoundValue =
|
|
|
+ Separator.VALUES.joinEncoded(connectedEntity.getValue());
|
|
|
+ columnPrefix.store(rowKey, table,
|
|
|
+ stringKeyConverter.encode(connectedEntity.getKey()), null,
|
|
|
+ compoundValue);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Stores information from the {@linkplain TimelineEntity} object.
|
|
|
*/
|
|
|
- private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
|
|
|
- boolean isApplication) throws IOException {
|
|
|
-
|
|
|
- if (isApplication) {
|
|
|
+ private void store(byte[] rowKey, TimelineEntity te,
|
|
|
+ String flowVersion,
|
|
|
+ Tables table) throws IOException {
|
|
|
+ switch (table) {
|
|
|
+ case APPLICATION_TABLE:
|
|
|
ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
|
|
|
ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
|
|
|
te.getCreatedTime());
|
|
|
ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
|
|
|
flowVersion);
|
|
|
- Map<String, Object> info = te.getInfo();
|
|
|
- if (info != null) {
|
|
|
- for (Map.Entry<String, Object> entry : info.entrySet()) {
|
|
|
- ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
|
|
|
- stringKeyConverter.encode(entry.getKey()), null,
|
|
|
- entry.getValue());
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
+ storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
|
|
|
+ applicationTable);
|
|
|
+ storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
|
|
|
+ applicationTable);
|
|
|
+ storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT,
|
|
|
+ applicationTable);
|
|
|
+ storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG,
|
|
|
+ applicationTable);
|
|
|
+ storeRelations(rowKey, te.getIsRelatedToEntities(),
|
|
|
+ ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
|
|
|
+ storeRelations(rowKey, te.getRelatesToEntities(),
|
|
|
+ ApplicationColumnPrefix.RELATES_TO, applicationTable);
|
|
|
+ break;
|
|
|
+ case ENTITY_TABLE:
|
|
|
EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
|
|
|
EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
|
|
|
EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
|
|
|
te.getCreatedTime());
|
|
|
EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
|
|
|
- Map<String, Object> info = te.getInfo();
|
|
|
- if (info != null) {
|
|
|
- for (Map.Entry<String, Object> entry : info.entrySet()) {
|
|
|
- EntityColumnPrefix.INFO.store(rowKey, entityTable,
|
|
|
- stringKeyConverter.encode(entry.getKey()), null,
|
|
|
- entry.getValue());
|
|
|
- }
|
|
|
+ storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
|
|
|
+ entityTable);
|
|
|
+ storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
|
|
|
+ entityTable);
|
|
|
+ storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT,
|
|
|
+ entityTable);
|
|
|
+ storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG,
|
|
|
+ entityTable);
|
|
|
+ storeRelations(rowKey, te.getIsRelatedToEntities(),
|
|
|
+ EntityColumnPrefix.IS_RELATED_TO, entityTable);
|
|
|
+ storeRelations(rowKey, te.getRelatesToEntities(),
|
|
|
+ EntityColumnPrefix.RELATES_TO, entityTable);
|
|
|
+ break;
|
|
|
+ case SUBAPPLICATION_TABLE:
|
|
|
+ SubApplicationColumn.ID.store(rowKey, subApplicationTable, null,
|
|
|
+ te.getId());
|
|
|
+ SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null,
|
|
|
+ te.getType());
|
|
|
+ SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null,
|
|
|
+ te.getCreatedTime());
|
|
|
+ SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null,
|
|
|
+ flowVersion);
|
|
|
+ storeInfo(rowKey, te.getInfo(), flowVersion,
|
|
|
+ SubApplicationColumnPrefix.INFO, subApplicationTable);
|
|
|
+ storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
|
|
|
+ subApplicationTable);
|
|
|
+ storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT,
|
|
|
+ subApplicationTable);
|
|
|
+ storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG,
|
|
|
+ subApplicationTable);
|
|
|
+ storeRelations(rowKey, te.getIsRelatedToEntities(),
|
|
|
+ SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable);
|
|
|
+ storeRelations(rowKey, te.getRelatesToEntities(),
|
|
|
+ SubApplicationColumnPrefix.RELATES_TO, subApplicationTable);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ LOG.info("Invalid table name provided.");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * stores the info information from {@linkplain TimelineEntity}.
|
|
|
+ */
|
|
|
+ private <T> void storeInfo(byte[] rowKey, Map<String, Object> info,
|
|
|
+ String flowVersion, ColumnPrefix<T> columnPrefix,
|
|
|
+ TypedBufferedMutator<T> table) throws IOException {
|
|
|
+ if (info != null) {
|
|
|
+ for (Map.Entry<String, Object> entry : info.entrySet()) {
|
|
|
+ columnPrefix.store(rowKey, table,
|
|
|
+ stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -377,19 +439,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
/**
|
|
|
* stores the config information from {@linkplain TimelineEntity}.
|
|
|
*/
|
|
|
- private void storeConfig(byte[] rowKey, Map<String, String> config,
|
|
|
- boolean isApplication) throws IOException {
|
|
|
- if (config == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- for (Map.Entry<String, String> entry : config.entrySet()) {
|
|
|
- byte[] configKey = stringKeyConverter.encode(entry.getKey());
|
|
|
- if (isApplication) {
|
|
|
- ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
|
|
|
- configKey, null, entry.getValue());
|
|
|
- } else {
|
|
|
- EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey,
|
|
|
- null, entry.getValue());
|
|
|
+ private <T> void storeConfig(byte[] rowKey, Map<String, String> config,
|
|
|
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
|
+ throws IOException {
|
|
|
+ if (config != null) {
|
|
|
+ for (Map.Entry<String, String> entry : config.entrySet()) {
|
|
|
+ byte[] configKey = stringKeyConverter.encode(entry.getKey());
|
|
|
+ columnPrefix.store(rowKey, table, configKey, null, entry.getValue());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -398,8 +454,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
* stores the {@linkplain TimelineMetric} information from the
|
|
|
* {@linkplain TimelineEvent} object.
|
|
|
*/
|
|
|
- private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
|
|
|
- boolean isApplication) throws IOException {
|
|
|
+ private <T> void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
|
|
|
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
|
+ throws IOException {
|
|
|
if (metrics != null) {
|
|
|
for (TimelineMetric metric : metrics) {
|
|
|
byte[] metricColumnQualifier =
|
|
@@ -407,13 +464,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
Map<Long, Number> timeseries = metric.getValues();
|
|
|
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
|
|
|
Long timestamp = timeseriesEntry.getKey();
|
|
|
- if (isApplication) {
|
|
|
- ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable,
|
|
|
- metricColumnQualifier, timestamp, timeseriesEntry.getValue());
|
|
|
- } else {
|
|
|
- EntityColumnPrefix.METRIC.store(rowKey, entityTable,
|
|
|
- metricColumnQualifier, timestamp, timeseriesEntry.getValue());
|
|
|
- }
|
|
|
+ columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp,
|
|
|
+ timeseriesEntry.getValue());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -422,8 +474,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
/**
|
|
|
* Stores the events from the {@linkplain TimelineEvent} object.
|
|
|
*/
|
|
|
- private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
|
|
|
- boolean isApplication) throws IOException {
|
|
|
+ private <T> void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
|
|
|
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
|
+ throws IOException {
|
|
|
if (events != null) {
|
|
|
for (TimelineEvent event : events) {
|
|
|
if (event != null) {
|
|
@@ -441,26 +494,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
byte[] columnQualifierBytes =
|
|
|
new EventColumnName(eventId, eventTimestamp, null)
|
|
|
.getColumnQualifier();
|
|
|
- if (isApplication) {
|
|
|
- ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
|
|
|
- columnQualifierBytes, null, Separator.EMPTY_BYTES);
|
|
|
- } else {
|
|
|
- EntityColumnPrefix.EVENT.store(rowKey, entityTable,
|
|
|
- columnQualifierBytes, null, Separator.EMPTY_BYTES);
|
|
|
- }
|
|
|
+ columnPrefix.store(rowKey, table, columnQualifierBytes, null,
|
|
|
+ Separator.EMPTY_BYTES);
|
|
|
} else {
|
|
|
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
|
|
|
// eventId=infoKey
|
|
|
byte[] columnQualifierBytes =
|
|
|
new EventColumnName(eventId, eventTimestamp, info.getKey())
|
|
|
.getColumnQualifier();
|
|
|
- if (isApplication) {
|
|
|
- ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
|
|
|
- columnQualifierBytes, null, info.getValue());
|
|
|
- } else {
|
|
|
- EntityColumnPrefix.EVENT.store(rowKey, entityTable,
|
|
|
- columnQualifierBytes, null, info.getValue());
|
|
|
- }
|
|
|
+ columnPrefix.store(rowKey, table, columnQualifierBytes, null,
|
|
|
+ info.getValue());
|
|
|
} // for info: eventInfo
|
|
|
}
|
|
|
}
|
|
@@ -500,6 +543,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
applicationTable.flush();
|
|
|
flowRunTable.flush();
|
|
|
flowActivityTable.flush();
|
|
|
+ subApplicationTable.flush();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -532,11 +576,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
|
// The close API performs flushing and releases any resources held
|
|
|
flowActivityTable.close();
|
|
|
}
|
|
|
+ if (subApplicationTable != null) {
|
|
|
+ subApplicationTable.close();
|
|
|
+ }
|
|
|
if (conn != null) {
|
|
|
LOG.info("closing the hbase Connection");
|
|
|
conn.close();
|
|
|
}
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
-
|
|
|
}
|