|
@@ -18,32 +18,18 @@
|
|
|
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.net.MalformedURLException;
|
|
|
-import java.net.URL;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
-import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Map.Entry;
|
|
|
import java.util.Set;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.hbase.Cell;
|
|
|
-import org.apache.hadoop.hbase.CellUtil;
|
|
|
-import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
|
-import org.apache.hadoop.hbase.HConstants;
|
|
|
-import org.apache.hadoop.hbase.KeyValue;
|
|
|
-import org.apache.hadoop.hbase.Tag;
|
|
|
-import org.apache.hadoop.hbase.util.Bytes;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
|
|
@@ -52,9 +38,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
|
|
|
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
|
|
|
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
|
|
|
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
|
|
|
|
|
|
/**
|
|
|
* A bunch of utility functions used across TimelineReader and TimelineWriter.
|
|
@@ -67,133 +50,6 @@ public final class TimelineStorageUtils {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
|
|
|
|
|
|
- /** milliseconds in one day. */
|
|
|
- public static final long MILLIS_ONE_DAY = 86400000L;
|
|
|
-
|
|
|
- /**
|
|
|
- * Converts an int into it's inverse int to be used in (row) keys
|
|
|
- * where we want to have the largest int value in the top of the table
|
|
|
- * (scans start at the largest int first).
|
|
|
- *
|
|
|
- * @param key value to be inverted so that the latest version will be first in
|
|
|
- * a scan.
|
|
|
- * @return inverted int
|
|
|
- */
|
|
|
- public static int invertInt(int key) {
|
|
|
- return Integer.MAX_VALUE - key;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
|
|
|
- * for a given input timestamp.
|
|
|
- *
|
|
|
- * @param ts Timestamp.
|
|
|
- * @return timestamp of that day's beginning (midnight)
|
|
|
- */
|
|
|
- public static long getTopOfTheDayTimestamp(long ts) {
|
|
|
- long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
|
|
|
- return dayTimestamp;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Combines the input array of attributes and the input aggregation operation
|
|
|
- * into a new array of attributes.
|
|
|
- *
|
|
|
- * @param attributes Attributes to be combined.
|
|
|
- * @param aggOp Aggregation operation.
|
|
|
- * @return array of combined attributes.
|
|
|
- */
|
|
|
- public static Attribute[] combineAttributes(Attribute[] attributes,
|
|
|
- AggregationOperation aggOp) {
|
|
|
- int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
|
|
|
- Attribute[] combinedAttributes = new Attribute[newLength];
|
|
|
-
|
|
|
- if (attributes != null) {
|
|
|
- System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
|
|
|
- }
|
|
|
-
|
|
|
- if (aggOp != null) {
|
|
|
- Attribute a2 = aggOp.getAttribute();
|
|
|
- combinedAttributes[newLength - 1] = a2;
|
|
|
- }
|
|
|
- return combinedAttributes;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns a number for the new array size. The new array is the combination
|
|
|
- * of input array of attributes and the input aggregation operation.
|
|
|
- *
|
|
|
- * @param attributes Attributes.
|
|
|
- * @param aggOp Aggregation operation.
|
|
|
- * @return the size for the new array
|
|
|
- */
|
|
|
- private static int getNewLengthCombinedAttributes(Attribute[] attributes,
|
|
|
- AggregationOperation aggOp) {
|
|
|
- int oldLength = getAttributesLength(attributes);
|
|
|
- int aggLength = getAppOpLength(aggOp);
|
|
|
- return oldLength + aggLength;
|
|
|
- }
|
|
|
-
|
|
|
- private static int getAppOpLength(AggregationOperation aggOp) {
|
|
|
- if (aggOp != null) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- private static int getAttributesLength(Attribute[] attributes) {
|
|
|
- if (attributes != null) {
|
|
|
- return attributes.length;
|
|
|
- }
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the first seen aggregation operation as seen in the list of input
|
|
|
- * tags or null otherwise.
|
|
|
- *
|
|
|
- * @param tags list of HBase tags.
|
|
|
- * @return AggregationOperation
|
|
|
- */
|
|
|
- public static AggregationOperation getAggregationOperationFromTagsList(
|
|
|
- List<Tag> tags) {
|
|
|
- for (AggregationOperation aggOp : AggregationOperation.values()) {
|
|
|
- for (Tag tag : tags) {
|
|
|
- if (tag.getType() == aggOp.getTagType()) {
|
|
|
- return aggOp;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Creates a {@link Tag} from the input attribute.
|
|
|
- *
|
|
|
- * @param attribute Attribute from which tag has to be fetched.
|
|
|
- * @return a HBase Tag.
|
|
|
- */
|
|
|
- public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
|
|
|
- // attribute could be either an Aggregation Operation or
|
|
|
- // an Aggregation Dimension
|
|
|
- // Get the Tag type from either
|
|
|
- AggregationOperation aggOp = AggregationOperation
|
|
|
- .getAggregationOperation(attribute.getKey());
|
|
|
- if (aggOp != null) {
|
|
|
- Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
|
|
|
- return t;
|
|
|
- }
|
|
|
-
|
|
|
- AggregationCompactionDimension aggCompactDim =
|
|
|
- AggregationCompactionDimension.getAggregationCompactionDimension(
|
|
|
- attribute.getKey());
|
|
|
- if (aggCompactDim != null) {
|
|
|
- Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
|
|
|
- return t;
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Matches key-values filter. Used for relatesTo/isRelatedTo filters.
|
|
|
*
|
|
@@ -520,125 +376,4 @@ public final class TimelineStorageUtils {
|
|
|
(obj instanceof Long);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * creates a new cell based on the input cell but with the new value.
|
|
|
- *
|
|
|
- * @param origCell Original cell
|
|
|
- * @param newValue new cell value
|
|
|
- * @return cell
|
|
|
- * @throws IOException while creating new cell.
|
|
|
- */
|
|
|
- public static Cell createNewCell(Cell origCell, byte[] newValue)
|
|
|
- throws IOException {
|
|
|
- return CellUtil.createCell(CellUtil.cloneRow(origCell),
|
|
|
- CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
|
|
|
- origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * creates a cell with the given inputs.
|
|
|
- *
|
|
|
- * @param row row of the cell to be created
|
|
|
- * @param family column family name of the new cell
|
|
|
- * @param qualifier qualifier for the new cell
|
|
|
- * @param ts timestamp of the new cell
|
|
|
- * @param newValue value of the new cell
|
|
|
- * @param tags tags in the new cell
|
|
|
- * @return cell
|
|
|
- * @throws IOException while creating the cell.
|
|
|
- */
|
|
|
- public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
|
|
|
- long ts, byte[] newValue, byte[] tags) throws IOException {
|
|
|
- return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
|
|
|
- newValue, tags);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * returns app id from the list of tags.
|
|
|
- *
|
|
|
- * @param tags cell tags to be looked into
|
|
|
- * @return App Id as the AggregationCompactionDimension
|
|
|
- */
|
|
|
- public static String getAggregationCompactionDimension(List<Tag> tags) {
|
|
|
- String appId = null;
|
|
|
- for (Tag t : tags) {
|
|
|
- if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
|
|
|
- .getType()) {
|
|
|
- appId = Bytes.toString(t.getValue());
|
|
|
- return appId;
|
|
|
- }
|
|
|
- }
|
|
|
- return appId;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * @param conf Yarn configuration. Used to see if there is an explicit config
|
|
|
- * pointing to the HBase config file to read. If null then a new
|
|
|
- * HBase configuration will be returned.
|
|
|
- * @return a configuration with the HBase configuration from the classpath,
|
|
|
- * optionally overwritten by the timeline service configuration URL if
|
|
|
- * specified.
|
|
|
- * @throws MalformedURLException if a timeline service HBase configuration URL
|
|
|
- * is specified but is a malformed URL.
|
|
|
- */
|
|
|
- public static Configuration getTimelineServiceHBaseConf(Configuration conf)
|
|
|
- throws MalformedURLException {
|
|
|
- Configuration hbaseConf;
|
|
|
-
|
|
|
- if (conf == null) {
|
|
|
- return HBaseConfiguration.create();
|
|
|
- }
|
|
|
-
|
|
|
- String timelineServiceHBaseConfFileURL =
|
|
|
- conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
|
|
|
- if (timelineServiceHBaseConfFileURL != null
|
|
|
- && timelineServiceHBaseConfFileURL.length() > 0) {
|
|
|
- // create a clone so that we don't mess with out input one
|
|
|
- hbaseConf = new Configuration(conf);
|
|
|
- Configuration plainHBaseConf = new Configuration(false);
|
|
|
- URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
|
|
|
- plainHBaseConf.addResource(hbaseSiteXML);
|
|
|
- HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
|
|
|
- } else {
|
|
|
- // default to what is on the classpath
|
|
|
- hbaseConf = HBaseConfiguration.create(conf);
|
|
|
- }
|
|
|
- return hbaseConf;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Given a row key prefix stored in a byte array, return a byte array for its
|
|
|
- * immediate next row key.
|
|
|
- *
|
|
|
- * @param rowKeyPrefix The provided row key prefix, represented in an array.
|
|
|
- * @return the closest next row key of the provided row key.
|
|
|
- */
|
|
|
- public static byte[] calculateTheClosestNextRowKeyForPrefix(
|
|
|
- byte[] rowKeyPrefix) {
|
|
|
- // Essentially we are treating it like an 'unsigned very very long' and
|
|
|
- // doing +1 manually.
|
|
|
- // Search for the place where the trailing 0xFFs start
|
|
|
- int offset = rowKeyPrefix.length;
|
|
|
- while (offset > 0) {
|
|
|
- if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
|
|
|
- break;
|
|
|
- }
|
|
|
- offset--;
|
|
|
- }
|
|
|
-
|
|
|
- if (offset == 0) {
|
|
|
- // We got an 0xFFFF... (only FFs) stopRow value which is
|
|
|
- // the last possible prefix before the end of the table.
|
|
|
- // So set it to stop at the 'end of the table'
|
|
|
- return HConstants.EMPTY_END_ROW;
|
|
|
- }
|
|
|
-
|
|
|
- // Copy the right length of the original
|
|
|
- byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
|
|
|
- // And increment the last one
|
|
|
- newStopRow[newStopRow.length - 1]++;
|
|
|
- return newStopRow;
|
|
|
- }
|
|
|
-
|
|
|
}
|