Browse Source

YARN-6027. Support fromid(offset) filter for /flows API (Rohith Sharma K S via Varun Saxena)

Varun Saxena 8 years ago
parent
commit
f9a18eac12
10 changed files with 523 additions and 243 deletions
  1. 176 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java
  2. 156 210
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
  3. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverterToString.java
  4. 55 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
  5. 25 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
  6. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
  7. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
  8. 27 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java
  9. 14 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
  10. 10 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java

+ 176 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/AbstractTimelineReaderHBaseTestBase.java

@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.junit.Assert;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+/**
+ * Test Base for TimelineReaderServer HBase tests.
+ */
+public abstract class AbstractTimelineReaderHBaseTestBase {
+  private static int serverPort;
+  private static TimelineReaderServer server;
+  private static HBaseTestingUtility util;
+
+  public static void setup() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    DataGeneratorForTest.createSchema(util.getConfiguration());
+  }
+
+  public static void tearDown() throws Exception {
+    if (server != null) {
+      server.stop();
+      server = null;
+    }
+    if (util != null) {
+      util.shutdownMiniCluster();
+    }
+  }
+
+  protected static void initialize() throws Exception {
+    try {
+      Configuration config = util.getConfiguration();
+      config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+      config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+      config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+          "localhost:0");
+      config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+      config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+          "org.apache.hadoop.yarn.server.timelineservice.storage."
+              + "HBaseTimelineReaderImpl");
+      config.setInt("hfile.format.version", 3);
+      server = new TimelineReaderServer() {
+        @Override
+        protected void setupOptions(Configuration conf) {
+          // The parent code tries to use HttpServer2 from this version of
+          // Hadoop, but the tests are loading in HttpServer2 from
+          // ${hbase-compatible-hadoop.version}. This version uses Jetty 9
+          // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there
+          // are many differences, including classnames and packages.
+          // We do nothing here, so that we don't cause a NoSuchMethodError.
+          // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3,
+          // we should be able to remove this @Override.
+        }
+      };
+      server.init(config);
+      server.start();
+      serverPort = server.getWebServerPort();
+    } catch (Exception e) {
+      Assert.fail("Web server failed to start");
+    }
+  }
+
+  protected Client createClient() {
+    ClientConfig cfg = new DefaultClientConfig();
+    cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+    return new Client(
+        new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg);
+  }
+
+  protected ClientResponse getResponse(Client client, URI uri)
+      throws Exception {
+    ClientResponse resp =
+        client.resource(uri).accept(MediaType.APPLICATION_JSON)
+            .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    if (resp == null || resp.getStatusInfo()
+        .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) {
+      String msg = "";
+      if (resp != null) {
+        msg = String.valueOf(resp.getStatusInfo().getStatusCode());
+      }
+      throw new IOException(
+          "Incorrect response from timeline reader. " + "Status=" + msg);
+    }
+    return resp;
+  }
+
+  protected void verifyHttpResponse(Client client, URI uri, Status status) {
+    ClientResponse resp =
+        client.resource(uri).accept(MediaType.APPLICATION_JSON)
+            .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    assertNotNull(resp);
+    assertTrue("Response from server should have been " + status,
+        resp.getStatusInfo().getStatusCode() == status.getStatusCode());
+    System.out.println("Response is: " + resp.getEntity(String.class));
+  }
+
+  protected List<FlowActivityEntity> verifyFlowEntites(Client client, URI uri,
+      int noOfEntities) throws Exception {
+    ClientResponse resp = getResponse(client, uri);
+    List<FlowActivityEntity> entities =
+        resp.getEntity(new GenericType<List<FlowActivityEntity>>() {
+        });
+    assertNotNull(entities);
+    assertEquals(noOfEntities, entities.size());
+    return entities;
+  }
+
+  protected static class DummyURLConnectionFactory
+      implements HttpURLConnectionFactory {
+
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url)
+        throws IOException {
+      try {
+        return (HttpURLConnection) url.openConnection();
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      }
+    }
+  }
+
+  protected static HBaseTestingUtility getHBaseTestingUtility() {
+    return util;
+  }
+
+  public static int getServerPort() {
+    return serverPort;
+  }
+}

File diff suppressed because it is too large
+ 156 - 210
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java


+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/KeyConverterToString.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Interface which has to be implemented for encoding and decoding row keys or
+ * column qualifiers as string.
+ */
+public interface KeyConverterToString<T> {
+  /**
+   * Encode key as string.
+   * @param key of type T to be encoded as string.
+   * @return encoded value as string.
+   */
+  String encodeAsString(T key);
+
+  /**
+   * Decode row key from string to a key of type T.
+   * @param encodedKey string representation of row key
+   * @return type T which has been constructed after decoding string.
+   */
+  T decodeFromString(String encodedKey);
+}

+ 55 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java

@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
+import java.util.List;
+
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 
 /**
@@ -32,8 +36,8 @@ public class FlowActivityRowKey {
   private final Long dayTs;
   private final String userId;
   private final String flowName;
-  private final KeyConverter<FlowActivityRowKey> flowActivityRowKeyConverter =
-      new FlowActivityRowKeyConverter();
+  private final FlowActivityRowKeyConverter
+      flowActivityRowKeyConverter = new FlowActivityRowKeyConverter();
 
   /**
    * @param clusterId identifying the cluster
@@ -103,14 +107,33 @@ public class FlowActivityRowKey {
     return new FlowActivityRowKeyConverter().decode(rowKey);
   }
 
+  /**
+   * Constructs a row key for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!user!flowName}.
+   * @return String representation of row key
+   */
+  public String getRowKeyAsString() {
+    return flowActivityRowKeyConverter.encodeAsString(this);
+  }
+
+  /**
+   * Given the raw row key as string, returns the row key as an object.
+   * @param encodedRowKey String representation of row key.
+   * @return A <cite>FlowActivityRowKey</cite> object.
+   */
+  public static FlowActivityRowKey parseRowKeyFromString(String encodedRowKey) {
+    return new FlowActivityRowKeyConverter().decodeFromString(encodedRowKey);
+  }
+
   /**
    * Encodes and decodes row key for flow activity table. The row key is of the
    * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
    * timestamp) is a long and rest are strings.
    * <p>
    */
-  final private static class FlowActivityRowKeyConverter implements
-      KeyConverter<FlowActivityRowKey> {
+  final private static class FlowActivityRowKeyConverter
+      implements KeyConverter<FlowActivityRowKey>,
+      KeyConverterToString<FlowActivityRowKey> {
 
     private FlowActivityRowKeyConverter() {
     }
@@ -192,5 +215,33 @@ public class FlowActivityRowKey {
               Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
       return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
     }
+
+    @Override
+    public String encodeAsString(FlowActivityRowKey key) {
+      if (key.getDayTimestamp() == null) {
+        return TimelineReaderUtils
+            .joinAndEscapeStrings(new String[] {key.clusterId});
+      } else if (key.getUserId() == null) {
+        return TimelineReaderUtils.joinAndEscapeStrings(
+            new String[] {key.clusterId, key.dayTs.toString()});
+      } else if (key.getFlowName() == null) {
+        return TimelineReaderUtils.joinAndEscapeStrings(
+            new String[] {key.clusterId, key.dayTs.toString(), key.userId});
+      }
+      return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
+          key.clusterId, key.dayTs.toString(), key.userId, key.flowName});
+    }
+
+    @Override
+    public FlowActivityRowKey decodeFromString(String encodedRowKey) {
+      List<String> split = TimelineReaderUtils.split(encodedRowKey);
+      if (split == null || split.size() != 4) {
+        throw new IllegalArgumentException(
+            "Invalid row key for flow activity.");
+      }
+      Long dayTs = Long.valueOf(split.get(1));
+      return new FlowActivityRowKey(split.get(0), dayTs, split.get(2),
+          split.get(3));
+    }
   }
 }

+ 25 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityCo
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
 
 import com.google.common.base.Preconditions;
 
@@ -110,11 +112,30 @@ class FlowActivityEntityReader extends TimelineEntityReader {
       Connection conn, FilterList filterList) throws IOException {
     Scan scan = new Scan();
     String clusterId = getContext().getClusterId();
-    if (getFilters().getCreatedTimeBegin() == 0L &&
-        getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
+    if (getFilters().getFromId() == null
+        && getFilters().getCreatedTimeBegin() == 0L
+        && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
        // All records have to be chosen.
       scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
           .getRowKeyPrefix());
+    } else if (getFilters().getFromId() != null) {
+      FlowActivityRowKey key = null;
+      try {
+        key =
+            FlowActivityRowKey.parseRowKeyFromString(getFilters().getFromId());
+      } catch (IllegalArgumentException e) {
+        throw new BadRequestException("Invalid filter fromid is provided.");
+      }
+      if (!clusterId.equals(key.getClusterId())) {
+        throw new BadRequestException(
+            "fromid doesn't belong to clusterId=" + clusterId);
+      }
+      scan.setStartRow(key.getRowKey());
+      scan.setStopRow(
+          new FlowActivityRowKeyPrefix(clusterId,
+              (getFilters().getCreatedTimeBegin() <= 0 ? 0
+                  : (getFilters().getCreatedTimeBegin() - 1)))
+                      .getRowKeyPrefix());
     } else {
       scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
           .getCreatedTimeEnd()).getRowKeyPrefix());
@@ -157,7 +178,8 @@ class FlowActivityEntityReader extends TimelineEntityReader {
       flowRun.setId(flowRun.getId());
       flowActivity.addFlowRun(flowRun);
     }
-
+    flowActivity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
+        rowKey.getRowKeyAsString());
     return flowActivity;
   }
 }

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java

@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
@@ -224,6 +225,26 @@ public class TestRowKeys {
     verifyRowPrefixBytes(byteRowKeyPrefix);
   }
 
+  @Test
+  public void testFlowActivityRowKeyAsString() {
+    String cluster = "cl" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR + "uster"
+        + TimelineReaderUtils.DEFAULT_ESCAPE_CHAR;
+    String user = TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "user";
+    String fName = "dummy_" + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR
+        + TimelineReaderUtils.DEFAULT_ESCAPE_CHAR + "flow"
+        + TimelineReaderUtils.DEFAULT_DELIMITER_CHAR;
+    Long ts = 1459900830000L;
+    Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+    String rowKeyAsString =
+        new FlowActivityRowKey(cluster, ts, user, fName).getRowKeyAsString();
+    FlowActivityRowKey rowKey =
+        FlowActivityRowKey.parseRowKeyFromString(rowKeyAsString);
+    assertEquals(cluster, rowKey.getClusterId());
+    assertEquals(dayTimestamp, rowKey.getDayTimestamp());
+    assertEquals(user, rowKey.getUserId());
+    assertEquals(fName, rowKey.getFlowName());
+  }
+
   @Test
   public void testFlowRunRowKey() {
     byte[] byteRowKey =

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java

@@ -185,7 +185,7 @@ public class TimelineReaderServer extends CompositeService {
   }
 
   @VisibleForTesting
-  int getWebServerPort() {
+  public int getWebServerPort() {
     return readerWebServer.getConnectorAddress(0).getPort();
   }
 

+ 27 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java

@@ -24,13 +24,29 @@ import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Set of utility methods to be used across timeline reader.
  */
-final class TimelineReaderUtils {
+public final class TimelineReaderUtils {
   private TimelineReaderUtils() {
   }
 
+  /**
+   * Default delimiter for joining strings.
+   */
+  @VisibleForTesting
+  public static final char DEFAULT_DELIMITER_CHAR = '!';
+
+  /**
+   * Default escape character used for joining strings.
+   */
+  @VisibleForTesting
+  public static final char DEFAULT_ESCAPE_CHAR = '*';
+
+  public static final String FROMID_KEY = "FROM_ID";
+
   /**
    * Split the passed string along the passed delimiter character while looking
    * for escape char to interpret the splitted parts correctly. For delimiter or
@@ -168,4 +184,14 @@ final class TimelineReaderUtils {
     // Join the strings after they have been escaped.
     return StringUtils.join(strs, delimiterChar);
   }
+
+  public static List<String> split(final String str)
+      throws IllegalArgumentException {
+    return split(str, DEFAULT_DELIMITER_CHAR, DEFAULT_ESCAPE_CHAR);
+  }
+
+  public static String joinAndEscapeStrings(final String[] strs) {
+    return joinAndEscapeStrings(strs, DEFAULT_DELIMITER_CHAR,
+        DEFAULT_ESCAPE_CHAR);
+  }
 }

+ 14 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java

@@ -1334,6 +1334,10 @@ public class TimelineReaderWebServices {
    *     2 dates.
    *     "daterange=20150711-" returns flows active on and after 20150711.
    *     "daterange=-20150711" returns flows active on and before 20150711.
+   * @param fromId If specified, retrieve the next set of flows from the given
+   *     fromId. The set of flows retrieved is inclusive of specified fromId.
+   *     fromId should be taken from the value associated with FROM_ID info key
+   *     in flow entity response which was sent earlier.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowActivityEntity</cite> instances are returned.<br>
@@ -1350,8 +1354,9 @@ public class TimelineReaderWebServices {
       @Context HttpServletRequest req,
       @Context HttpServletResponse res,
       @QueryParam("limit") String limit,
-      @QueryParam("daterange") String dateRange) {
-    return getFlows(req, res, null, limit, dateRange);
+      @QueryParam("daterange") String dateRange,
+      @QueryParam("fromid") String fromId) {
+    return getFlows(req, res, null, limit, dateRange, fromId);
   }
 
   /**
@@ -1380,6 +1385,10 @@ public class TimelineReaderWebServices {
    *     2 dates.
    *     "daterange=20150711-" returns flows active on and after 20150711.
    *     "daterange=-20150711" returns flows active on and before 20150711.
+   * @param fromId If specified, retrieve the next set of flows from the given
+   *     fromId. The set of flows retrieved is inclusive of specified fromId.
+   *     fromId should be taken from the value associated with FROM_ID info key
+   *     in flow entity response which was sent earlier.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowActivityEntity</cite> instances are returned.<br>
@@ -1397,7 +1406,8 @@ public class TimelineReaderWebServices {
       @Context HttpServletResponse res,
       @PathParam("clusterid") String clusterId,
       @QueryParam("limit") String limit,
-      @QueryParam("daterange") String dateRange) {
+      @QueryParam("daterange") String dateRange,
+      @QueryParam("fromid") String fromId) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -1414,7 +1424,7 @@ public class TimelineReaderWebServices {
       TimelineEntityFilters entityFilters =
           TimelineReaderWebServicesUtils.createTimelineEntityFilters(
               limit, null, null, null, null, null, null, null, null, null,
-              null);
+              fromId);
       entityFilters.setCreatedTimeBegin(range.dateStart);
       entityFilters.setCreatedTimeEnd(range.dateEnd);
       entities = timelineReaderManager.getEntities(

+ 10 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineUIDConverter.java

@@ -195,39 +195,29 @@ enum TimelineUIDConverter {
   };
 
   /**
-   * Delimiter used for UID.
-   */
-  public static final char UID_DELIMITER_CHAR = '!';
-
-  /**
-   * Escape Character used if delimiter or escape character itself is part of
-   * different components of UID.
-   */
-  public static final char UID_ESCAPE_CHAR = '*';
-
-  /**
-   * Split UID using {@link #UID_DELIMITER_CHAR} and {@link #UID_ESCAPE_CHAR}.
+   * Split UID using {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} and
+   * {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR}.
    * @param uid UID to be splitted.
    * @return a list of different parts of UID split across delimiter.
    * @throws IllegalArgumentException if UID is not properly escaped.
    */
   private static List<String> splitUID(String uid)
       throws IllegalArgumentException {
-    return TimelineReaderUtils.split(uid, UID_DELIMITER_CHAR, UID_ESCAPE_CHAR);
+    return TimelineReaderUtils.split(uid);
   }
 
   /**
-   * Join different parts of UID delimited by {@link #UID_DELIMITER_CHAR} with
-   * delimiter and escape character escaped using {@link #UID_ESCAPE_CHAR} if
-   * UID parts contain them.
+   * Join different parts of UID delimited by
+   * {@link TimelineReaderUtils#DEFAULT_DELIMITER_CHAR} with delimiter and
+   * escape character escaped using
+   * {@link TimelineReaderUtils#DEFAULT_ESCAPE_CHAR} if UID parts contain them.
    * @param parts an array of UID parts to be joined.
    * @return a string joined using the delimiter with escape and delimiter
-   *     characters escaped if they are part of the string parts to be joined.
-   *     Returns null if one of the parts is null.
+   *         characters escaped if they are part of the string parts to be
+   *         joined. Returns null if one of the parts is null.
    */
   private static String joinAndEscapeUIDParts(String[] parts) {
-    return TimelineReaderUtils.joinAndEscapeStrings(parts, UID_DELIMITER_CHAR,
-        UID_ESCAPE_CHAR);
+    return TimelineReaderUtils.joinAndEscapeStrings(parts);
   }
 
   /**

Some files were not shown because too many files changed in this diff