|
@@ -0,0 +1,181 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
|
|
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
|
|
|
|
+
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.hbase.client.Connection;
|
|
|
|
+import org.apache.hadoop.hbase.client.Result;
|
|
|
|
+import org.apache.hadoop.hbase.client.ResultScanner;
|
|
|
|
+import org.apache.hadoop.hbase.client.Scan;
|
|
|
|
+import org.apache.hadoop.hbase.filter.FilterList;
|
|
|
|
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
|
|
|
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
|
|
|
|
+import org.apache.hadoop.hbase.filter.PageFilter;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
|
|
|
+
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.util.Arrays;
|
|
|
|
+import java.util.Set;
|
|
|
|
+import java.util.TreeSet;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * Timeline entity reader for listing all available entity types given one
|
|
|
|
+ * reader context. Right now only supports listing all entity types within one
|
|
|
|
+ * YARN application.
|
|
|
|
+ */
|
|
|
|
+public final class EntityTypeReader extends AbstractTimelineStorageReader {
|
|
|
|
+
|
|
|
|
+ private static final Log LOG = LogFactory.getLog(EntityTypeReader.class);
|
|
|
|
+ private static final EntityTable ENTITY_TABLE = new EntityTable();
|
|
|
|
+
|
|
|
|
+ public EntityTypeReader(TimelineReaderContext context) {
|
|
|
|
+ super(context);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Reads a set of timeline entity types from the HBase storage for the given
|
|
|
|
+ * context.
|
|
|
|
+ *
|
|
|
|
+ * @param hbaseConf HBase Configuration.
|
|
|
|
+ * @param conn HBase Connection.
|
|
|
|
+ * @return a set of <cite>TimelineEntity</cite> objects, with only type field
|
|
|
|
+ * set.
|
|
|
|
+ * @throws IOException if any exception is encountered while reading entities.
|
|
|
|
+ */
|
|
|
|
+ public Set<String> readEntityTypes(Configuration hbaseConf,
|
|
|
|
+ Connection conn) throws IOException {
|
|
|
|
+
|
|
|
|
+ validateParams();
|
|
|
|
+ augmentParams(hbaseConf, conn);
|
|
|
|
+
|
|
|
|
+ Set<String> types = new TreeSet<>();
|
|
|
|
+ TimelineReaderContext context = getContext();
|
|
|
|
+ EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(),
|
|
|
|
+ context.getUserId(), context.getFlowName(), context.getFlowRunId(),
|
|
|
|
+ context.getAppId());
|
|
|
|
+ byte[] currRowKey = prefix.getRowKeyPrefix();
|
|
|
|
+ byte[] nextRowKey = prefix.getRowKeyPrefix();
|
|
|
|
+ nextRowKey[nextRowKey.length - 1]++;
|
|
|
|
+
|
|
|
|
+ FilterList typeFilterList = new FilterList();
|
|
|
|
+ typeFilterList.addFilter(new FirstKeyOnlyFilter());
|
|
|
|
+ typeFilterList.addFilter(new KeyOnlyFilter());
|
|
|
|
+ typeFilterList.addFilter(new PageFilter(1));
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("FilterList created for scan is - " + typeFilterList);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int counter = 0;
|
|
|
|
+ while (true) {
|
|
|
|
+ try (ResultScanner results
|
|
|
|
+ = getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey))
|
|
|
|
+ {
|
|
|
|
+ TimelineEntity entity = parseEntityForType(results.next());
|
|
|
|
+ if (entity == null) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ ++counter;
|
|
|
|
+ if (!types.add(entity.getType())) {
|
|
|
|
+ LOG.warn("Failed to add type " + entity.getType()
|
|
|
|
+ + " to the result set because there is a duplicated copy. ");
|
|
|
|
+ }
|
|
|
|
+ String currType = entity.getType();
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Current row key: " + Arrays.toString(currRowKey));
|
|
|
|
+ LOG.debug("New entity type discovered: " + currType);
|
|
|
|
+ }
|
|
|
|
+ currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Scanned " + counter + "records for "
|
|
|
|
+ + types.size() + "types");
|
|
|
|
+ }
|
|
|
|
+ return types;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void validateParams() {
|
|
|
|
+ Preconditions.checkNotNull(getContext(), "context shouldn't be null");
|
|
|
|
+ Preconditions.checkNotNull(getContext().getClusterId(),
|
|
|
|
+ "clusterId shouldn't be null");
|
|
|
|
+ Preconditions.checkNotNull(getContext().getAppId(),
|
|
|
|
+ "appId shouldn't be null");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Gets the possibly next row key prefix given current prefix and type.
|
|
|
|
+ *
|
|
|
|
+ * @param currRowKeyPrefix The current prefix that contains user, cluster,
|
|
|
|
+ * flow, run, and application id.
|
|
|
|
+ * @param entityType Current entity type.
|
|
|
|
+ * @return A new prefix for the possibly immediately next row key.
|
|
|
|
+ */
|
|
|
|
+ private static byte[] getNextRowKey(byte[] currRowKeyPrefix,
|
|
|
|
+ String entityType) {
|
|
|
|
+ if (currRowKeyPrefix == null || entityType == null) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ byte[] entityTypeEncoded = Separator.QUALIFIERS.join(
|
|
|
|
+ Separator.encode(entityType, Separator.SPACE, Separator.TAB,
|
|
|
|
+ Separator.QUALIFIERS),
|
|
|
|
+ Separator.EMPTY_BYTES);
|
|
|
|
+
|
|
|
|
+ byte[] currRowKey
|
|
|
|
+ = new byte[currRowKeyPrefix.length + entityTypeEncoded.length];
|
|
|
|
+ System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0,
|
|
|
|
+ currRowKeyPrefix.length);
|
|
|
|
+ System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
|
|
|
|
+ entityTypeEncoded.length);
|
|
|
|
+
|
|
|
|
+ return HBaseTimelineStorageUtils.
|
|
|
|
+ calculateTheClosestNextRowKeyForPrefix(currRowKey);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private ResultScanner getResult(Configuration hbaseConf, Connection conn,
|
|
|
|
+ FilterList filterList, byte[] startPrefix, byte[] endPrefix)
|
|
|
|
+ throws IOException {
|
|
|
|
+ Scan scan = new Scan(startPrefix, endPrefix);
|
|
|
|
+ scan.setFilter(filterList);
|
|
|
|
+ scan.setSmall(true);
|
|
|
|
+ return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private TimelineEntity parseEntityForType(Result result)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (result == null || result.isEmpty()) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ TimelineEntity entity = new TimelineEntity();
|
|
|
|
+ EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow());
|
|
|
|
+ entity.setType(newRowKey.getEntityType());
|
|
|
|
+ return entity;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|