|
@@ -40,10 +40,14 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
|
|
@@ -55,20 +59,24 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@@ -121,14 +129,15 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
Configuration hbaseConf =
|
|
Configuration hbaseConf =
|
|
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
|
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
|
conn = ConnectionFactory.createConnection(hbaseConf);
|
|
- entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
|
|
|
|
- appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
|
|
|
|
- applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
|
|
|
|
- flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
|
|
|
|
|
|
+ entityTable = new EntityTableRW().getTableMutator(hbaseConf, conn);
|
|
|
|
+ appToFlowTable = new AppToFlowTableRW().getTableMutator(hbaseConf, conn);
|
|
|
|
+ applicationTable =
|
|
|
|
+ new ApplicationTableRW().getTableMutator(hbaseConf, conn);
|
|
|
|
+ flowRunTable = new FlowRunTableRW().getTableMutator(hbaseConf, conn);
|
|
flowActivityTable =
|
|
flowActivityTable =
|
|
- new FlowActivityTable().getTableMutator(hbaseConf, conn);
|
|
|
|
|
|
+ new FlowActivityTableRW().getTableMutator(hbaseConf, conn);
|
|
subApplicationTable =
|
|
subApplicationTable =
|
|
- new SubApplicationTable().getTableMutator(hbaseConf, conn);
|
|
|
|
|
|
+ new SubApplicationTableRW().getTableMutator(hbaseConf, conn);
|
|
|
|
|
|
UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
|
|
UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
|
|
UserGroupInformation.getLoginUser() :
|
|
UserGroupInformation.getLoginUser() :
|
|
@@ -232,12 +241,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
// store in App to flow table
|
|
// store in App to flow table
|
|
AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
|
|
AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
|
|
byte[] rowKey = appToFlowRowKey.getRowKey();
|
|
byte[] rowKey = appToFlowRowKey.getRowKey();
|
|
- AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId,
|
|
|
|
- null, flowName);
|
|
|
|
- AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId,
|
|
|
|
- null, flowRunId);
|
|
|
|
- AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null,
|
|
|
|
- userId);
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, appToFlowTable,
|
|
|
|
+ AppToFlowColumnPrefix.FLOW_NAME, clusterId, null, flowName);
|
|
|
|
+ ColumnRWHelper.store(rowKey, appToFlowTable,
|
|
|
|
+ AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId, null, flowRunId);
|
|
|
|
+ ColumnRWHelper.store(rowKey, appToFlowTable, AppToFlowColumnPrefix.USER_ID,
|
|
|
|
+ clusterId, null, userId);
|
|
|
|
|
|
// store in flow run table
|
|
// store in flow run table
|
|
storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
|
|
storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
|
|
@@ -248,8 +257,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
|
|
appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
|
|
.getRowKey();
|
|
.getRowKey();
|
|
byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
|
|
byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
|
|
- FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes,
|
|
|
|
- flowActivityTable, qualifier, null, flowVersion,
|
|
|
|
|
|
+ ColumnRWHelper.store(flowActivityRowKeyBytes, flowActivityTable,
|
|
|
|
+ FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion,
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -259,8 +268,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
|
|
private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
|
|
String appId, TimelineEntity te) throws IOException {
|
|
String appId, TimelineEntity te) throws IOException {
|
|
byte[] rowKey = flowRunRowKey.getRowKey();
|
|
byte[] rowKey = flowRunRowKey.getRowKey();
|
|
- FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
|
|
|
|
- te.getCreatedTime(),
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MIN_START_TIME,
|
|
|
|
+ null, te.getCreatedTime(),
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -282,8 +291,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
appFinishedTimeStamp, flowRunRowKey.getUserId(),
|
|
appFinishedTimeStamp, flowRunRowKey.getUserId(),
|
|
flowRunRowKey.getFlowName()).getRowKey();
|
|
flowRunRowKey.getFlowName()).getRowKey();
|
|
byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
|
|
byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
|
|
- FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
|
|
|
|
- null, flowVersion,
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, flowActivityTable,
|
|
|
|
+ FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion,
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -296,8 +305,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
byte[] rowKey = flowRunRowKey.getRowKey();
|
|
byte[] rowKey = flowRunRowKey.getRowKey();
|
|
Attribute attributeAppId =
|
|
Attribute attributeAppId =
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
|
|
AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
|
|
- FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
|
|
|
|
- appFinishedTimeStamp, attributeAppId);
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MAX_END_TIME,
|
|
|
|
+ null, appFinishedTimeStamp, attributeAppId);
|
|
|
|
|
|
// store the final value of metrics since application has finished
|
|
// store the final value of metrics since application has finished
|
|
Set<TimelineMetric> metrics = te.getMetrics();
|
|
Set<TimelineMetric> metrics = te.getMetrics();
|
|
@@ -328,7 +337,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
Map<Long, Number> timeseries = metric.getValues();
|
|
Map<Long, Number> timeseries = metric.getValues();
|
|
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
|
|
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
|
|
Long timestamp = timeseriesEntry.getKey();
|
|
Long timestamp = timeseriesEntry.getKey();
|
|
- FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumnPrefix.METRIC,
|
|
metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
|
|
metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
|
|
attributes);
|
|
attributes);
|
|
}
|
|
}
|
|
@@ -338,7 +347,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
/**
|
|
/**
|
|
* Stores the Relations from the {@linkplain TimelineEntity} object.
|
|
* Stores the Relations from the {@linkplain TimelineEntity} object.
|
|
*/
|
|
*/
|
|
- private <T> void storeRelations(byte[] rowKey,
|
|
|
|
|
|
+ private <T extends BaseTable<T>> void storeRelations(byte[] rowKey,
|
|
Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
|
|
Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
|
|
TypedBufferedMutator<T> table) throws IOException {
|
|
TypedBufferedMutator<T> table) throws IOException {
|
|
if (connectedEntities != null) {
|
|
if (connectedEntities != null) {
|
|
@@ -347,9 +356,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
// id3?id4?id5
|
|
// id3?id4?id5
|
|
String compoundValue =
|
|
String compoundValue =
|
|
Separator.VALUES.joinEncoded(connectedEntity.getValue());
|
|
Separator.VALUES.joinEncoded(connectedEntity.getValue());
|
|
- columnPrefix.store(rowKey, table,
|
|
|
|
- stringKeyConverter.encode(connectedEntity.getKey()), null,
|
|
|
|
- compoundValue);
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
|
|
|
|
+ stringKeyConverter.encode(connectedEntity.getKey()),
|
|
|
|
+ null, compoundValue);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -362,11 +371,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
Tables table) throws IOException {
|
|
Tables table) throws IOException {
|
|
switch (table) {
|
|
switch (table) {
|
|
case APPLICATION_TABLE:
|
|
case APPLICATION_TABLE:
|
|
- ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
|
|
|
|
- ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
|
|
|
|
- te.getCreatedTime());
|
|
|
|
- ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
|
|
|
|
- flowVersion);
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, applicationTable,
|
|
|
|
+ ApplicationColumn.ID, null, te.getId());
|
|
|
|
+ ColumnRWHelper.store(rowKey, applicationTable,
|
|
|
|
+ ApplicationColumn.CREATED_TIME, null, te.getCreatedTime());
|
|
|
|
+ ColumnRWHelper.store(rowKey, applicationTable,
|
|
|
|
+ ApplicationColumn.FLOW_VERSION, null, flowVersion);
|
|
storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
|
|
storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
|
|
applicationTable);
|
|
applicationTable);
|
|
storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
|
|
storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
|
|
@@ -381,11 +391,14 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
ApplicationColumnPrefix.RELATES_TO, applicationTable);
|
|
ApplicationColumnPrefix.RELATES_TO, applicationTable);
|
|
break;
|
|
break;
|
|
case ENTITY_TABLE:
|
|
case ENTITY_TABLE:
|
|
- EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
|
|
|
|
- EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
|
|
|
|
- EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
|
|
|
|
- te.getCreatedTime());
|
|
|
|
- EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, entityTable,
|
|
|
|
+ EntityColumn.ID, null, te.getId());
|
|
|
|
+ ColumnRWHelper.store(rowKey, entityTable,
|
|
|
|
+ EntityColumn.TYPE, null, te.getType());
|
|
|
|
+ ColumnRWHelper.store(rowKey, entityTable,
|
|
|
|
+ EntityColumn.CREATED_TIME, null, te.getCreatedTime());
|
|
|
|
+ ColumnRWHelper.store(rowKey, entityTable,
|
|
|
|
+ EntityColumn.FLOW_VERSION, null, flowVersion);
|
|
storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
|
|
storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
|
|
entityTable);
|
|
entityTable);
|
|
storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
|
|
storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
|
|
@@ -400,14 +413,14 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
EntityColumnPrefix.RELATES_TO, entityTable);
|
|
EntityColumnPrefix.RELATES_TO, entityTable);
|
|
break;
|
|
break;
|
|
case SUBAPPLICATION_TABLE:
|
|
case SUBAPPLICATION_TABLE:
|
|
- SubApplicationColumn.ID.store(rowKey, subApplicationTable, null,
|
|
|
|
- te.getId());
|
|
|
|
- SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null,
|
|
|
|
- te.getType());
|
|
|
|
- SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null,
|
|
|
|
- te.getCreatedTime());
|
|
|
|
- SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null,
|
|
|
|
- flowVersion);
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, subApplicationTable, SubApplicationColumn.ID,
|
|
|
|
+ null, te.getId());
|
|
|
|
+ ColumnRWHelper.store(rowKey, subApplicationTable,
|
|
|
|
+ SubApplicationColumn.TYPE, null, te.getType());
|
|
|
|
+ ColumnRWHelper.store(rowKey, subApplicationTable,
|
|
|
|
+ SubApplicationColumn.CREATED_TIME, null, te.getCreatedTime());
|
|
|
|
+ ColumnRWHelper.store(rowKey, subApplicationTable,
|
|
|
|
+ SubApplicationColumn.FLOW_VERSION, null, flowVersion);
|
|
storeInfo(rowKey, te.getInfo(), flowVersion,
|
|
storeInfo(rowKey, te.getInfo(), flowVersion,
|
|
SubApplicationColumnPrefix.INFO, subApplicationTable);
|
|
SubApplicationColumnPrefix.INFO, subApplicationTable);
|
|
storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
|
|
storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
|
|
@@ -430,12 +443,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
/**
|
|
/**
|
|
* stores the info information from {@linkplain TimelineEntity}.
|
|
* stores the info information from {@linkplain TimelineEntity}.
|
|
*/
|
|
*/
|
|
- private <T> void storeInfo(byte[] rowKey, Map<String, Object> info,
|
|
|
|
- String flowVersion, ColumnPrefix<T> columnPrefix,
|
|
|
|
- TypedBufferedMutator<T> table) throws IOException {
|
|
|
|
|
|
+ private <T extends BaseTable<T>> void storeInfo(byte[] rowKey,
|
|
|
|
+ Map<String, Object> info, String flowVersion,
|
|
|
|
+ ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T > table)
|
|
|
|
+ throws IOException {
|
|
if (info != null) {
|
|
if (info != null) {
|
|
for (Map.Entry<String, Object> entry : info.entrySet()) {
|
|
for (Map.Entry<String, Object> entry : info.entrySet()) {
|
|
- columnPrefix.store(rowKey, table,
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
|
|
stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
|
|
stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -444,13 +458,15 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
/**
|
|
/**
|
|
* stores the config information from {@linkplain TimelineEntity}.
|
|
* stores the config information from {@linkplain TimelineEntity}.
|
|
*/
|
|
*/
|
|
- private <T> void storeConfig(byte[] rowKey, Map<String, String> config,
|
|
|
|
|
|
+ private <T extends BaseTable<T>> void storeConfig(
|
|
|
|
+ byte[] rowKey, Map<String, String> config,
|
|
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
throws IOException {
|
|
throws IOException {
|
|
if (config != null) {
|
|
if (config != null) {
|
|
for (Map.Entry<String, String> entry : config.entrySet()) {
|
|
for (Map.Entry<String, String> entry : config.entrySet()) {
|
|
byte[] configKey = stringKeyConverter.encode(entry.getKey());
|
|
byte[] configKey = stringKeyConverter.encode(entry.getKey());
|
|
- columnPrefix.store(rowKey, table, configKey, null, entry.getValue());
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, table, columnPrefix, configKey,
|
|
|
|
+ null, entry.getValue());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -459,7 +475,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
* stores the {@linkplain TimelineMetric} information from the
|
|
* stores the {@linkplain TimelineMetric} information from the
|
|
* {@linkplain TimelineEvent} object.
|
|
* {@linkplain TimelineEvent} object.
|
|
*/
|
|
*/
|
|
- private <T> void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
|
|
|
|
|
|
+ private <T extends BaseTable<T>> void storeMetrics(
|
|
|
|
+ byte[] rowKey, Set<TimelineMetric> metrics,
|
|
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
throws IOException {
|
|
throws IOException {
|
|
if (metrics != null) {
|
|
if (metrics != null) {
|
|
@@ -469,8 +486,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
Map<Long, Number> timeseries = metric.getValues();
|
|
Map<Long, Number> timeseries = metric.getValues();
|
|
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
|
|
for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
|
|
Long timestamp = timeseriesEntry.getKey();
|
|
Long timestamp = timeseriesEntry.getKey();
|
|
- columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp,
|
|
|
|
- timeseriesEntry.getValue());
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
|
|
|
|
+ metricColumnQualifier, timestamp, timeseriesEntry.getValue());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -479,7 +496,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
/**
|
|
/**
|
|
* Stores the events from the {@linkplain TimelineEvent} object.
|
|
* Stores the events from the {@linkplain TimelineEvent} object.
|
|
*/
|
|
*/
|
|
- private <T> void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
|
|
|
|
|
|
+ private <T extends BaseTable<T>> void storeEvents(
|
|
|
|
+ byte[] rowKey, Set<TimelineEvent> events,
|
|
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
|
|
throws IOException {
|
|
throws IOException {
|
|
if (events != null) {
|
|
if (events != null) {
|
|
@@ -499,16 +517,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
|
byte[] columnQualifierBytes =
|
|
byte[] columnQualifierBytes =
|
|
new EventColumnName(eventId, eventTimestamp, null)
|
|
new EventColumnName(eventId, eventTimestamp, null)
|
|
.getColumnQualifier();
|
|
.getColumnQualifier();
|
|
- columnPrefix.store(rowKey, table, columnQualifierBytes, null,
|
|
|
|
- Separator.EMPTY_BYTES);
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
|
|
|
|
+ columnQualifierBytes, null, Separator.EMPTY_BYTES);
|
|
} else {
|
|
} else {
|
|
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
|
|
for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
|
|
// eventId=infoKey
|
|
// eventId=infoKey
|
|
byte[] columnQualifierBytes =
|
|
byte[] columnQualifierBytes =
|
|
new EventColumnName(eventId, eventTimestamp, info.getKey())
|
|
new EventColumnName(eventId, eventTimestamp, info.getKey())
|
|
.getColumnQualifier();
|
|
.getColumnQualifier();
|
|
- columnPrefix.store(rowKey, table, columnQualifierBytes, null,
|
|
|
|
- info.getValue());
|
|
|
|
|
|
+ ColumnRWHelper.store(rowKey, table, columnPrefix,
|
|
|
|
+ columnQualifierBytes, null, info.getValue());
|
|
} // for info: eventInfo
|
|
} // for info: eventInfo
|
|
}
|
|
}
|
|
}
|
|
}
|