Pārlūkot izejas kodu

YARN-3040. Make putEntities operation be aware of the app's context. Contributed by Zhijie Shen

Junping Du 10 gadi atpakaļ
vecāks
revīzija
18cad84a6c
36 mainītis faili ar 957 papildinājumiem un 144 dzēšanām
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 26 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
  4. 74 51
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
  5. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
  6. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
  7. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
  8. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
  9. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java
  10. 46 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
  11. 127 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java
  12. 141 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
  13. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
  14. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  15. 39 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
  16. 17 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
  17. 11 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  18. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
  19. 15 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  20. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
  21. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
  22. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
  23. 22 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  24. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
  25. 18 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
  26. 28 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
  27. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
  28. 13 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
  29. 81 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
  30. 28 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
  31. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
  32. 32 37
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
  33. 8 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
  34. 31 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
  35. 35 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
  36. 15 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -41,6 +41,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3034. Implement RM starting its timeline collector. (Naganarasimha G R
     via junping_du)
 
+    YARN-3040. Make putEntities operation be aware of the app's context. (Zhijie Shen 
+    via junping_du)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -133,6 +133,7 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_PREFIX = "yarn.resourcemanager.";
 
   public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
+  public static final String DEFAULT_RM_CLUSTER_ID = "yarn_cluster";
 
   public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
 

+ 26 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 
 import org.apache.commons.cli.CommandLine;
@@ -183,6 +185,9 @@ public class Client {
   // Timeline domain writer access control
   private String modifyACLs = null;
 
+  private String flowId = null;
+  private String flowRunId = null;
+
   // Command line options
   private Options opts;
 
@@ -256,7 +261,8 @@ public class Client {
     opts.addOption("shell_args", true, "Command line args for the shell script." +
         "Multiple args can be separated by empty space.");
     opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
-    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("shell_env", true,
+        "Environment for shell script. Specified as env_key=env_val pairs");
     opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
     opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
     opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
@@ -283,6 +289,10 @@ public class Client {
         + "modify the timeline entities in the given domain");
     opts.addOption("create", false, "Flag to indicate whether to create the "
         + "domain specified with -domain.");
+    opts.addOption("flow", true, "ID of the flow which the distributed shell "
+        + "app belongs to");
+    opts.addOption("flow_run", true, "ID of the flowrun which the distributed "
+        + "shell app belongs to");
     opts.addOption("help", false, "Print usage");
     opts.addOption("node_label_expression", true,
         "Node label expression to determine the nodes"
@@ -442,6 +452,12 @@ public class Client {
       }
     }
 
+    if (cliParser.hasOption("flow")) {
+      flowId = cliParser.getOptionValue("flow");
+    }
+    if (cliParser.hasOption("flow_run")) {
+      flowRunId = cliParser.getOptionValue("flow_run");
+    }
     return true;
   }
 
@@ -533,6 +549,15 @@ public class Client {
         .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
     }
 
+    Set<String> tags = new HashSet<String>();
+    if (flowId != null) {
+      tags.add(TimelineUtils.generateFlowIdTag(flowId));
+    }
+    if (flowRunId != null) {
+      tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
+    }
+    appContext.setApplicationTags(tags);
+
     // set local resources for the application master
     // local files or archives as needed
     // In this scenario, the jar file for the application master is part of the local resources			

+ 74 - 51
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -169,20 +172,26 @@ public class TestDistributedShell {
   
   @Test(timeout=90000)
   public void testDSShellWithDomain() throws Exception {
-    testDSShell(true, "v1");
+    testDSShell(true, "v1", true);
   }
 
   @Test(timeout=90000)
   public void testDSShellWithoutDomain() throws Exception {
-    testDSShell(false, "v1");
+    testDSShell(false, "v1", true);
   }
 
   @Test(timeout=90000)
-  public void testDSShellWithoutDomainV2() throws Exception {
-    testDSShell(false, "v2");
+  public void testDSShellWithoutDomainV2DefaultFlow() throws Exception {
+    testDSShell(false, "v2", true);
   }
 
-  public void testDSShell(boolean haveDomain, String timelineVersion)
+  @Test(timeout=90000)
+  public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception {
+    testDSShell(false, "v2", false);
+  }
+
+  public void testDSShell(boolean haveDomain, String timelineVersion,
+      boolean defaultFlow)
       throws Exception {
     String[] args = {
         "--jar",
@@ -220,6 +229,15 @@ public class TestDistributedShell {
       };
       isTestingTimelineV2 = true;
       args = mergeArgs(args, timelineArgs);
+      if (!defaultFlow) {
+        String[] flowArgs = {
+            "--flow",
+            "test_flow_id",
+            "--flow_run",
+            "12345678"
+        };
+        args = mergeArgs(args, flowArgs);
+      }
       LOG.info("Setup: Using timeline v2!");
     }
 
@@ -279,7 +297,7 @@ public class TestDistributedShell {
     if (!isTestingTimelineV2) {
       checkTimelineV1(haveDomain);
     } else {
-      checkTimelineV2(haveDomain, appId);
+      checkTimelineV2(haveDomain, appId, defaultFlow);
     }
   }
 
@@ -328,53 +346,58 @@ public class TestDistributedShell {
     }
   }
 
-  private void checkTimelineV2(boolean haveDomain, ApplicationId appId) {
-    // For PoC check in /tmp/ YARN-3264
-    String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+  private void checkTimelineV2(
+      boolean haveDomain, ApplicationId appId, boolean defaultFlow)
+      throws Exception {
+    // For PoC check in /tmp/timeline_service_data YARN-3264
+    String tmpRoot =
+        FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+            + "/entities/";
 
     File tmpRootFolder = new File(tmpRoot);
-    Assert.assertTrue(tmpRootFolder.isDirectory());
-
-    // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
-    String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/";
-
-    File entityFolder = new File(outputDirApp);
-    Assert.assertTrue(entityFolder.isDirectory());
-
-    // there will be at least one attempt, look for that file
-    String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
-        + "_000" + appId.getId() + "_000001"
-        + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-    String appAttemptFileName = outputDirApp + appTimestampFileName;
-    File appAttemptFile = new File(appAttemptFileName);
-    Assert.assertTrue(appAttemptFile.exists());
-
-    String outputDirContainer = tmpRoot + "/DS_CONTAINER/";
-    File containerFolder = new File(outputDirContainer);
-    Assert.assertTrue(containerFolder.isDirectory());
-
-    String containerTimestampFileName = "container_"
-        + appId.getClusterTimestamp() + "_000" + appId.getId()
-        + "_01_000002.thist";
-    String containerFileName = outputDirContainer + containerTimestampFileName;
-    File containerFile = new File(containerFileName);
-    Assert.assertTrue(containerFile.exists());
-    String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
-        + "_";
-    deleteAppFiles(new File(outputDirApp), appTimeStamp);
-    deleteAppFiles(new File(outputDirContainer), appTimeStamp);
-    tmpRootFolder.delete();
-  }
-
-  private void deleteAppFiles(File rootDir, String appTimeStamp) {
-    boolean deleted = false;
-    File[] listOfFiles = rootDir.listFiles();
-    for (File f1 : listOfFiles) {
-      // list all attempts for this app and delete them
-      if (f1.getName().contains(appTimeStamp)){
-        deleted = f1.delete();
-        Assert.assertTrue(deleted);
-      }
+    try {
+      Assert.assertTrue(tmpRootFolder.isDirectory());
+
+      // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+      String outputDirApp = tmpRoot +
+          YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
+          UserGroupInformation.getCurrentUser().getShortUserName() +
+          (defaultFlow ? "/" +
+              TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
+              "/0/" : "/test_flow_id/12345678/") +
+          appId.toString() + "/DS_APP_ATTEMPT/";
+
+      File entityFolder = new File(outputDirApp);
+      Assert.assertTrue(entityFolder.isDirectory());
+
+      // there will be at least one attempt, look for that file
+      String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
+          + "_000" + appId.getId() + "_000001"
+          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      String appAttemptFileName = outputDirApp + appTimestampFileName;
+      File appAttemptFile = new File(appAttemptFileName);
+      Assert.assertTrue(appAttemptFile.exists());
+
+      String outputDirContainer = tmpRoot +
+          YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
+          UserGroupInformation.getCurrentUser().getShortUserName() +
+          (defaultFlow ? "/" +
+              TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
+              "/0/" : "/test_flow_id/12345678/") +
+          appId.toString() + "/DS_CONTAINER/";
+      File containerFolder = new File(outputDirContainer);
+      Assert.assertTrue(containerFolder.isDirectory());
+
+      String containerTimestampFileName = "container_"
+          + appId.getClusterTimestamp() + "_000" + appId.getId()
+          + "_01_000002.thist";
+      String containerFileName = outputDirContainer + containerTimestampFileName;
+      File containerFile = new File(containerFileName);
+      Assert.assertTrue(containerFile.exists());
+      String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+          + "_";
+    } finally {
+      FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
     }
   }
 

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -43,6 +44,9 @@ import org.codehaus.jackson.map.ObjectMapper;
 @Evolving
 public class TimelineUtils {
 
+  public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG";
+  public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG";
+
   private static ObjectMapper mapper;
 
   static {
@@ -119,4 +123,16 @@ public class TimelineUtils {
         getTimelineTokenServiceAddress(conf);
     return SecurityUtil.buildTokenService(timelineServiceAddr);
   }
+
+  public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) {
+    return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
+  }
+
+  public static String generateFlowIdTag(String flowId) {
+    return FLOW_ID_TAG_PREFIX + ":" + flowId;
+  }
+
+  public static String generateFlowRunIdTag(String flowRunId) {
+    return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId;
+  }
 }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java

@@ -21,6 +21,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
 
@@ -54,4 +56,18 @@ public interface CollectorNodemanagerProtocol {
       ReportNewCollectorInfoRequest request)
       throws YarnException, IOException;
 
+  /**
+   * <p>
+   * The collector needs to get the context information including user, flow
+   * and flow run ID to associate with every incoming put-entity requests.
+   * </p>
+   * @param request the request of getting the aggregator context information of
+   *                the given application
+   * @return
+   * @throws YarnException
+   * @throws IOException
+   */
+  GetTimelineCollectorContextResponse getTimelineCollectorContext(
+      GetTimelineCollectorContextRequest request)
+      throws YarnException, IOException;
 }

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java

@@ -30,11 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
 
@@ -84,6 +89,21 @@ public class CollectorNodemanagerProtocolPBClientImpl implements
     }
   }
 
+  @Override
+  public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+      GetTimelineCollectorContextRequest request)
+      throws YarnException, IOException {
+    GetTimelineCollectorContextRequestProto requestProto =
+        ((GetTimelineCollectorContextRequestPBImpl) request).getProto();
+    try {
+      return new GetTimelineCollectorContextResponsePBImpl(
+          proxy.getTimelineCollectorContext(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
   @Override
   public void close() {
     if (this.proxy != null) {

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java

@@ -20,11 +20,16 @@ package org.apache.hadoop.yarn.server.api.impl.pb.service;
 import java.io.IOException;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
 
@@ -56,4 +61,20 @@ public class CollectorNodemanagerProtocolPBServiceImpl implements
     }
   }
 
+  @Override
+  public GetTimelineCollectorContextResponseProto getTimelineCollectorContext(
+      RpcController controller,
+      GetTimelineCollectorContextRequestProto proto) throws ServiceException {
+    GetTimelineCollectorContextRequestPBImpl request =
+        new GetTimelineCollectorContextRequestPBImpl(proto);
+    try {
+      GetTimelineCollectorContextResponse response =
+          real.getTimelineCollectorContext(request);
+      return ((GetTimelineCollectorContextResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class GetTimelineCollectorContextRequest {
+
+  public static GetTimelineCollectorContextRequest newInstance(
+      ApplicationId appId) {
+    GetTimelineCollectorContextRequest request =
+        Records.newRecord(GetTimelineCollectorContextRequest.class);
+    request.setApplicationId(appId);
+    return request;
+  }
+
+  public abstract ApplicationId getApplicationId();
+
+  public abstract void setApplicationId(ApplicationId appId);
+}

+ 46 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class GetTimelineCollectorContextResponse {
+
+  public static GetTimelineCollectorContextResponse newInstance(
+      String userId, String flowId, String flowRunId) {
+    GetTimelineCollectorContextResponse response =
+        Records.newRecord(GetTimelineCollectorContextResponse.class);
+    response.setUserId(userId);
+    response.setFlowId(flowId);
+    response.setFlowRunId(flowRunId);
+    return response;
+  }
+
+  public abstract String getUserId();
+
+  public abstract void setUserId(String userId);
+
+  public abstract String getFlowId();
+
+  public abstract void setFlowId(String flowId);
+
+  public abstract String getFlowRunId();
+
+  public abstract void setFlowRunId(String flowRunId);
+}

+ 127 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+
+public class GetTimelineCollectorContextRequestPBImpl extends
+    GetTimelineCollectorContextRequest {
+
+  GetTimelineCollectorContextRequestProto
+      proto = GetTimelineCollectorContextRequestProto.getDefaultInstance();
+  GetTimelineCollectorContextRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private ApplicationId appId = null;
+
+  public GetTimelineCollectorContextRequestPBImpl() {
+    builder = GetTimelineCollectorContextRequestProto.newBuilder();
+  }
+
+  public GetTimelineCollectorContextRequestPBImpl(
+      GetTimelineCollectorContextRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public GetTimelineCollectorContextRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void mergeLocalToBuilder() {
+    if (appId != null) {
+      builder.setAppId(convertToProtoFormat(this.appId));
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = GetTimelineCollectorContextRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    if (this.appId != null) {
+      return this.appId;
+    }
+
+    GetTimelineCollectorContextRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasAppId()) {
+      return null;
+    }
+
+    this.appId = convertFromProtoFormat(p.getAppId());
+    return this.appId;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId appId) {
+    maybeInitBuilder();
+    if (appId == null)
+      builder.clearAppId();
+    this.appId = appId;
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(YarnProtos.ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl)t).getProto();
+  }
+}

+ 141 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java

@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+
+public class GetTimelineCollectorContextResponsePBImpl extends
+    GetTimelineCollectorContextResponse {
+
+  GetTimelineCollectorContextResponseProto proto =
+      GetTimelineCollectorContextResponseProto.getDefaultInstance();
+  GetTimelineCollectorContextResponseProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public GetTimelineCollectorContextResponsePBImpl() {
+    builder = GetTimelineCollectorContextResponseProto.newBuilder();
+  }
+
+  public GetTimelineCollectorContextResponsePBImpl(
+      GetTimelineCollectorContextResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public GetTimelineCollectorContextResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = GetTimelineCollectorContextResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public String getUserId() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasUserId()) {
+      return null;
+    }
+    return p.getUserId();
+  }
+
+  @Override
+  public void setUserId(String userId) {
+    maybeInitBuilder();
+    if (userId == null) {
+      builder.clearUserId();
+      return;
+    }
+    builder.setUserId(userId);
+  }
+
+  @Override
+  public String getFlowId() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasFlowId()) {
+      return null;
+    }
+    return p.getFlowId();
+  }
+
+  @Override
+  public void setFlowId(String flowId) {
+    maybeInitBuilder();
+    if (flowId == null) {
+      builder.clearFlowId();
+      return;
+    }
+    builder.setFlowId(flowId);
+  }
+
+  @Override
+  public String getFlowRunId() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasFlowRunId()) {
+      return null;
+    }
+    return p.getFlowRunId();
+  }
+
+  @Override
+  public void setFlowRunId(String flowRunId) {
+    maybeInitBuilder();
+    if (flowRunId == null) {
+      builder.clearFlowRunId();
+      return;
+    }
+    builder.setFlowRunId(flowRunId);
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto

@@ -26,4 +26,5 @@ import "yarn_server_common_service_protos.proto";
 
 service CollectorNodemanagerProtocolService {
   rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto);
+  rpc getTimelineCollectorContext (GetTimelineCollectorContextRequestProto) returns (GetTimelineCollectorContextResponseProto);
 }

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -112,6 +112,15 @@ message ReportNewCollectorInfoRequestProto {
 message ReportNewCollectorInfoResponseProto {
 }
 
+message GetTimelineCollectorContextRequestProto {
+  optional ApplicationIdProto appId = 1;
+}
+
+message GetTimelineCollectorContextResponseProto {
+  optional string user_id = 1;
+  optional string flow_id = 2;
+  optional string flow_run_id = 3;
+}
 
 message NMContainerStatusProto {
   optional ContainerIdProto container_id = 1;

+ 39 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java

@@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
@@ -168,6 +170,31 @@ public class TestRPC {
       Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
     }
 
+    // Verify request with a valid app ID
+    try {
+      GetTimelineCollectorContextRequest request =
+          GetTimelineCollectorContextRequest.newInstance(
+              ApplicationId.newInstance(0, 1));
+      GetTimelineCollectorContextResponse response =
+          proxy.getTimelineCollectorContext(request);
+      Assert.assertEquals("test_user_id", response.getUserId());
+      Assert.assertEquals("test_flow_id", response.getFlowId());
+      Assert.assertEquals("test_flow_run_id", response.getFlowRunId());
+    } catch (YarnException | IOException e) {
+      Assert.fail("RPC call failured is not expected here.");
+    }
+
+    // Verify request with an invalid app ID
+    try {
+      GetTimelineCollectorContextRequest request =
+          GetTimelineCollectorContextRequest.newInstance(
+              ApplicationId.newInstance(0, 2));
+      proxy.getTimelineCollectorContext(request);
+      Assert.fail("RPC call failured is expected here.");
+    } catch (YarnException | IOException e) {
+      Assert.assertTrue(e instanceof  YarnException);
+      Assert.assertTrue(e.getMessage().contains("The application is not found."));
+    }
     server.stop();
   }
 
@@ -348,6 +375,18 @@ public class TestRPC {
           recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class);
       return response;
     }
+
+    @Override
+    public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+        GetTimelineCollectorContextRequest request)
+        throws  YarnException, IOException {
+      if (request.getApplicationId().getId() == 1) {
+         return GetTimelineCollectorContextResponse.newInstance(
+                "test_user_id", "test_flow_id", "test_flow_run_id");
+      } else {
+        throw new YarnException("The application is not found.");
+      }
+    }
   }
 
 }

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java

@@ -30,13 +30,17 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 
 public class NMCollectorService extends CompositeService implements
     CollectorNodemanagerProtocol {
@@ -93,7 +97,7 @@ public class NMCollectorService extends CompositeService implements
 
   @Override
   public ReportNewCollectorInfoResponse reportNewCollectorInfo(
-      ReportNewCollectorInfoRequest request) throws IOException {
+      ReportNewCollectorInfoRequest request) throws YarnException, IOException {
     List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
     if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
       Map<ApplicationId, String> newCollectorsMap =
@@ -107,4 +111,16 @@ public class NMCollectorService extends CompositeService implements
     return ReportNewCollectorInfoResponse.newInstance();
   }
 
+  @Override
+  public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+      GetTimelineCollectorContextRequest request)
+      throws YarnException, IOException {
+    Application app = context.getApplications().get(request.getApplicationId());
+    if (app == null) {
+      throw new YarnException("Application " + request.getApplicationId() +
+          " doesn't exist on NM.");
+    }
+    return GetTimelineCollectorContextResponse.newInstance(
+        app.getUser(), app.getFlowId(), app.getFlowRunId());
+  }
 }

+ 11 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -144,10 +144,11 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
-import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerManagerImpl extends CompositeService implements
     ServiceStateChangeListener, ContainerManagementProtocol,
@@ -323,8 +324,9 @@ public class ContainerManagerImpl extends CompositeService implements
     }
 
     LOG.info("Recovering application " + appId);
-    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
-        creds, context);
+    //TODO: Recover flow and flow run ID
+    ApplicationImpl app = new ApplicationImpl(
+        dispatcher, p.getUser(), null, null, appId, creds, context);
     context.getApplications().put(appId, app);
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
@@ -923,8 +925,12 @@ public class ContainerManagerImpl extends CompositeService implements
     try {
       if (!serviceStopped) {
         // Create the application
-        Application application =
-            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
+        String flowId = launchContext.getEnvironment().get(
+            TimelineUtils.FLOW_ID_TAG_PREFIX);
+        String flowRunId = launchContext.getEnvironment().get(
+            TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+        Application application = new ApplicationImpl(
+            dispatcher, user, flowId, flowRunId, applicationID, credentials, context);
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
           LOG.info("Creating a new application reference for app " + applicationID);

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java

@@ -35,4 +35,8 @@ public interface Application extends EventHandler<ApplicationEvent> {
 
   ApplicationState getApplicationState();
 
+  String getFlowId();
+
+  String getFlowRunId();
+
 }

+ 15 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -64,6 +64,8 @@ public class ApplicationImpl implements Application {
 
   final Dispatcher dispatcher;
   final String user;
+  final String flowId;
+  final String flowRunId;
   final ApplicationId appId;
   final Credentials credentials;
   Map<ApplicationAccessType, String> applicationACLs;
@@ -79,10 +81,13 @@ public class ApplicationImpl implements Application {
   Map<ContainerId, Container> containers =
       new HashMap<ContainerId, Container>();
 
-  public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
-      Credentials credentials, Context context) {
+  public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
+      String flowRunId, ApplicationId appId, Credentials credentials,
+      Context context) {
     this.dispatcher = dispatcher;
     this.user = user;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
     this.appId = appId;
     this.credentials = credentials;
     this.aclsManager = context.getApplicationACLsManager();
@@ -487,4 +492,12 @@ public class ApplicationImpl implements Application {
       this.readLock.unlock();
     }
   }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public String getFlowRunId() {
+    return flowRunId;
+  }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java

@@ -530,7 +530,8 @@ public class TestApplication {
       this.user = user;
       this.appId = BuilderUtils.newApplicationId(timestamp, id);
 
-      app = new ApplicationImpl(dispatcher, this.user, appId, null, context);
+      app = new ApplicationImpl(
+          dispatcher, this.user, null, null, appId, null, context);
       containers = new ArrayList<Container>();
       for (int i = 0; i < numContainers; i++) {
         Container container = createMockedContainer(this.appId, i);

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java

@@ -39,6 +39,9 @@ public class MockApp implements Application {
   Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
   ApplicationState appState;
   Application app;
+  String flowId;
+  String flowRunId;
+
 
   public MockApp(int uniqId) {
     this("mockUser", 1234, uniqId);
@@ -77,4 +80,11 @@ public class MockApp implements Application {
 
   public void handle(ApplicationEvent event) {}
 
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public String getFlowRunId() {
+    return flowRunId;
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java

@@ -327,7 +327,7 @@ public class TestNMWebServices extends JerseyTestBase {
     final String filename = "logfile1";
     final String logMessage = "log message\n";
     nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
-        appId, null, nmContext));
+        null, null, appId, null, nmContext));
     
     MockContainer container = new MockContainer(appAttemptId,
         new AsyncDispatcher(), new Configuration(), "user", appId, 1);

+ 22 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java

@@ -34,8 +34,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -214,6 +215,26 @@ public class AMLauncher implements Runnable {
             .get(applicationId)
             .getSubmitTime()));
 
+    // Set flow context info
+    for (String tag :
+        rmContext.getRMApps().get(applicationId).getApplicationTags()) {
+      if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX  + ":") ||
+          tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) {
+        String value = tag.substring(
+            TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1);
+        if (!value.isEmpty()) {
+          environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, value);
+        }
+      }
+      if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX  + ":") ||
+          tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
+        String value = tag.substring(
+            TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
+        if (!value.isEmpty()) {
+          environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, value);
+        }
+      }
+    }
     Credentials credentials = new Credentials();
     DataInputByteBuffer dibb = new DataInputByteBuffer();
     ByteBuffer tokens = container.getTokens();

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 
 /**
  * This class is responsible for posting application and appattempt lifecycle
@@ -87,6 +88,12 @@ public class RMTimelineCollector extends TimelineCollector {
       LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
     }
   }
+  
+  @Override
+  protected TimelineCollectorContext getTimelineEntityContext() {
+    // TODO address in YARN-3390.
+    return null;
+  }
 
   /**
    * EventHandler implementation which forward events to SystemMetricsPublisher.

+ 18 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java

@@ -20,20 +20,27 @@ package org.apache.hadoop.yarn.server.timelineservice;
 
 
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
+
 public class TestTimelineServiceClientIntegration {
   private static TimelineCollectorManager collectorManager;
   private static PerNodeTimelineCollectorsAuxService auxService;
@@ -86,7 +93,17 @@ public class TestTimelineServiceClientIntegration {
 
     @Override
     protected CollectorNodemanagerProtocol getNMCollectorService() {
-      return mock(CollectorNodemanagerProtocol.class);
+      CollectorNodemanagerProtocol protocol =
+          mock(CollectorNodemanagerProtocol.class);
+      try {
+        GetTimelineCollectorContextResponse response =
+            GetTimelineCollectorContextResponse.newInstance(null, null, null);
+        when(protocol.getTimelineCollectorContext(any(
+            GetTimelineCollectorContextRequest.class))).thenReturn(response);
+      } catch (YarnException | IOException e) {
+        fail();
+      }
+      return protocol;
     }
   }
 }

+ 28 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java

@@ -18,9 +18,14 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 /**
  * Service that handles writes to the timeline service and writes them to the
@@ -31,16 +36,29 @@ import org.apache.hadoop.conf.Configuration;
 @Private
 @Unstable
 public class AppLevelTimelineCollector extends TimelineCollector {
-  private final String applicationId;
-  // TODO define key metadata such as flow metadata, user, and queue
+  private final ApplicationId appId;
+  private final TimelineCollectorContext context;
 
-  public AppLevelTimelineCollector(String applicationId) {
-    super(AppLevelTimelineCollector.class.getName() + " - " + applicationId);
-    this.applicationId = applicationId;
+  public AppLevelTimelineCollector(ApplicationId appId) {
+    super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
+    Preconditions.checkNotNull(appId, "AppId shouldn't be null");
+    this.appId = appId;
+    context = new TimelineCollectorContext();
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
+        YarnConfiguration.DEFAULT_RM_CLUSTER_ID));
+    // Set the default values, which will be updated with an RPC call to get the
+    // context info from NM.
+    // Current user usually is not the app user, but keep this field non-null
+    context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
+    // Use app ID to generate a default flow ID for orphan app
+    context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
+    // Set the flow run ID to 0 if it's an orphan app
+    context.setFlowRunId("0");
+    context.setAppId(appId.toString());
     super.serviceInit(conf);
   }
 
@@ -54,4 +72,9 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     super.serviceStop();
   }
 
+  @Override
+  protected TimelineCollectorContext getTimelineEntityContext() {
+    return context;
+  }
+
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java

@@ -95,7 +95,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
    */
   public boolean addApplication(ApplicationId appId) {
     AppLevelTimelineCollector collector =
-        new AppLevelTimelineCollector(appId.toString());
+        new AppLevelTimelineCollector(appId);
     return (collectorManager.putIfAbsent(appId, collector)
         == collector);
   }

+ 13 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+
 /**
  * Service that handles writes to the timeline service and writes them to the
  * backing storage.
@@ -83,21 +84,24 @@ public abstract class TimelineCollector extends CompositeService {
    *
    * This method should be reserved for selected critical entities and events.
    * For normal voluminous writes one should use the async method
-   * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+   * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}.
    *
    * @param entities entities to post
    * @param callerUgi the caller UGI
    * @return the response that contains the result of the post.
    */
-  public TimelineWriteResponse postEntities(TimelineEntities entities,
+  public TimelineWriteResponse putEntities(TimelineEntities entities,
       UserGroupInformation callerUgi) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
-      LOG.debug("postEntities(entities=" + entities + ", callerUgi="
+      LOG.debug("putEntities(entities=" + entities + ", callerUgi="
           + callerUgi + ")");
     }
 
-    return writer.write(entities);
+    TimelineCollectorContext context = getTimelineEntityContext();
+    return writer.write(context.getClusterId(), context.getUserId(),
+        context.getFlowId(), context.getFlowRunId(), context.getAppId(),
+        entities);
   }
 
   /**
@@ -111,12 +115,15 @@ public abstract class TimelineCollector extends CompositeService {
    * @param entities entities to post
    * @param callerUgi the caller UGI
    */
-  public void postEntitiesAsync(TimelineEntities entities,
+  public void putEntitiesAsync(TimelineEntities entities,
       UserGroupInformation callerUgi) {
     // TODO implement
     if (LOG.isDebugEnabled()) {
-      LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+      LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" +
           callerUgi + ")");
     }
   }
+
+  protected abstract TimelineCollectorContext getTimelineEntityContext();
+
 }

+ 81 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+public class TimelineCollectorContext {
+
+  private String clusterId;
+  private String userId;
+  private String flowId;
+  private String flowRunId;
+  private String appId;
+
+  public TimelineCollectorContext() {
+    this(null, null, null, null, null);
+  }
+
+  public TimelineCollectorContext(String clusterId, String userId,
+      String flowId, String flowRunId, String appId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowId = flowId;
+    this.flowRunId = flowRunId;
+    this.appId = appId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public void setClusterId(String clusterId) {
+    this.clusterId = clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public void setUserId(String userId) {
+    this.userId = userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  public void setFlowId(String flowId) {
+    this.flowId = flowId;
+  }
+
+  public String getFlowRunId() {
+    return flowRunId;
+  }
+
+  public void setFlowRunId(String flowRunId) {
+    this.flowRunId = flowRunId;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+}

+ 28 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java

@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@@ -102,6 +104,7 @@ public class TimelineCollectorManager extends CompositeService {
 
   @Override
   protected void serviceStart() throws Exception {
+    nmCollectorService = getNMCollectorService();
     startWebApp();
     super.serviceStart();
   }
@@ -151,11 +154,11 @@ public class TimelineCollectorManager extends CompositeService {
     // Report to NM if a new collector is added.
     if (collectorIsNew) {
       try {
+        updateTimelineCollectorContext(appId, collector);
         reportNewCollectorToNM(appId);
       } catch (Exception e) {
-        // throw exception here as it cannot be used if failed report to NM
-        LOG.error("Failed to report a new collector for application: " + appId +
-            " to the NM Collector Service.");
+        // throw exception here as it cannot be used if failed communicate with NM
+        LOG.error("Failed to communicate with NM Collector Service for " + appId);
         throw new YarnRuntimeException(e);
       }
     }
@@ -250,7 +253,6 @@ public class TimelineCollectorManager extends CompositeService {
 
   private void reportNewCollectorToNM(ApplicationId appId)
       throws YarnException, IOException {
-    this.nmCollectorService = getNMCollectorService();
     ReportNewCollectorInfoRequest request =
         ReportNewCollectorInfoRequest.newInstance(appId,
             this.timelineRestServerBindAddress);
@@ -259,6 +261,28 @@ public class TimelineCollectorManager extends CompositeService {
     nmCollectorService.reportNewCollectorInfo(request);
   }
 
+  private void updateTimelineCollectorContext(
+      ApplicationId appId, TimelineCollector collector)
+      throws YarnException, IOException {
+    GetTimelineCollectorContextRequest request =
+        GetTimelineCollectorContextRequest.newInstance(appId);
+    LOG.info("Get timeline collector context for " + appId);
+    GetTimelineCollectorContextResponse response =
+        nmCollectorService.getTimelineCollectorContext(request);
+    String userId = response.getUserId();
+    if (userId != null && !userId.isEmpty()) {
+      collector.getTimelineEntityContext().setUserId(userId);
+    }
+    String flowId = response.getFlowId();
+    if (flowId != null && !flowId.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowId(flowId);
+    }
+    String flowRunId = response.getFlowRunId();
+    if (flowRunId != null && !flowRunId.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowRunId(flowRunId);
+    }
+  }
+
   @VisibleForTesting
   protected CollectorNodemanagerProtocol getNMCollectorService() {
     Configuration conf = getConfig();

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java

@@ -138,7 +138,7 @@ public class TimelineCollectorWebService {
         LOG.error("Application not found");
         throw new NotFoundException(); // different exception?
       }
-      collector.postEntities(entities, callerUgi);
+      collector.putEntities(entities, callerUgi);
       return Response.ok().build();
     } catch (Exception e) {
       LOG.error("Error putting entities", e);

+ 32 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java

@@ -52,7 +52,9 @@ public class FileSystemTimelineWriterImpl extends AbstractService
 
   /** default value for storage location on local disk */
   public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
-    = "/tmp/timeline_service_data/";
+    = "/tmp/timeline_service_data";
+
+  private static final String ENTITIES_DIR = "entities";
 
   /** Default extension for output files */
   public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
@@ -61,38 +63,25 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     super((FileSystemTimelineWriterImpl.class.getName()));
   }
 
-  /**
-   * Stores the entire information in {@link TimelineEntity} to the
-   * timeline store. Any errors occurring for individual write request objects
-   * will be reported in the response.
-   *
-   * @param data
-   *          a {@link TimelineEntity} object
-   * @return {@link TimelineWriteResponse} object.
-   * @throws IOException
-   */
   @Override
-  public TimelineWriteResponse write(TimelineEntities entities)
-      throws IOException {
+  public TimelineWriteResponse write(String clusterId, String userId,
+      String flowId, String flowRunId, String appId,
+      TimelineEntities entities) throws IOException {
     TimelineWriteResponse response = new TimelineWriteResponse();
     for (TimelineEntity entity : entities.getEntities()) {
-      write(entity, response);
+      write(clusterId, userId, flowId, flowRunId, appId, entity, response);
     }
     return response;
   }
 
-  private void write(TimelineEntity entity,
+  private void write(String clusterId, String userId,
+      String flowId, String flowRunId, String appId, TimelineEntity entity,
       TimelineWriteResponse response) throws IOException {
     PrintWriter out = null;
     try {
-      File outputDir = new File(outputRoot + entity.getType());
-      String fileName = outputDir + "/" + entity.getId()
-          + TIMELINE_SERVICE_STORAGE_EXTENSION;
-      if (!outputDir.exists()) {
-        if (!outputDir.mkdirs()) {
-          throw new IOException("Could not create directories for " + fileName);
-        }
-      }
+      String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowId,
+          flowRunId, appId, entity.getType());
+      String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION;
       out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
       out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
       out.write("\n");
@@ -112,20 +101,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     }
   }
 
-  /**
-   * Aggregates the entity information to the timeline store based on which
-   * track this entity is to be rolled up to The tracks along which aggregations
-   * are to be done are given by {@link TimelineAggregationTrack}
-   *
-   * Any errors occurring for individual write request objects will be reported
-   * in the response.
-   *
-   * @param data
-   *          a {@link TimelineEntity} object
-   *          a {@link TimelineAggregationTrack} enum value
-   * @return a {@link TimelineWriteResponse} object.
-   * @throws IOException
-   */
+  @Override
   public TimelineWriteResponse aggregate(TimelineEntity data,
       TimelineAggregationTrack track) throws IOException {
     return null;
@@ -141,4 +117,23 @@ public class FileSystemTimelineWriterImpl extends AbstractService
     outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
         DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
   }
+
+  @Override
+  public void serviceStart() throws Exception {
+    mkdirs(outputRoot, ENTITIES_DIR);
+  }
+
+  private static String mkdirs(String... dirStrs) throws IOException {
+    StringBuilder path = new StringBuilder();
+    for (String dirStr : dirStrs) {
+      path.append(dirStr).append('/');
+      File dir = new File(path.toString());
+      if (!dir.exists()) {
+        if (!dir.mkdirs()) {
+          throw new IOException("Could not create directories for " + dir);
+        }
+      }
+    }
+    return path.toString();
+  }
 }

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java

@@ -39,12 +39,19 @@ public interface TimelineWriter extends Service {
    * timeline store. Any errors occurring for individual write request objects
    * will be reported in the response.
    *
+   * @param clusterId context cluster ID
+   * @param userId context user ID
+   * @param flowId context flow ID
+   * @param flowRunId context flow run ID
+   * @param appId context app ID
    * @param data
    *          a {@link TimelineEntities} object.
    * @return a {@link TimelineWriteResponse} object.
    * @throws IOException
    */
-  TimelineWriteResponse write(TimelineEntities data) throws IOException;
+  TimelineWriteResponse write(String clusterId, String userId,
+      String flowId, String flowRunId, String appId,
+      TimelineEntities data) throws IOException;
 
   /**
    * Aggregates the entity information to the timeline store based on which

+ 31 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java

@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -29,16 +30,25 @@ import static org.mockito.Mockito.when;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
 import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.junit.After;
 import org.junit.Test;
 
+import java.io.IOException;
+
 public class TestPerNodeTimelineCollectorsAuxService {
   private ApplicationAttemptId appAttemptId;
+  private PerNodeTimelineCollectorsAuxService auxService;
 
   public TestPerNodeTimelineCollectorsAuxService() {
     ApplicationId appId =
@@ -46,10 +56,16 @@ public class TestPerNodeTimelineCollectorsAuxService {
     appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
   }
 
+  @After
+  public void tearDown() throws Shell.ExitCodeException {
+    if (auxService != null) {
+      auxService.stop();
+    }
+  }
+
   @Test
   public void testAddApplication() throws Exception {
-    PerNodeTimelineCollectorsAuxService auxService =
-        createCollectorAndAddApplication();
+    auxService = createCollectorAndAddApplication();
     // auxService should have a single app
     assertTrue(auxService.hasApplication(
         appAttemptId.getApplicationId().toString()));
@@ -58,7 +74,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
 
   @Test
   public void testAddApplicationNonAMContainer() throws Exception {
-    PerNodeTimelineCollectorsAuxService auxService = createCollector();
+    auxService = createCollector();
 
     ContainerId containerId = getContainerId(2L); // not an AM
     ContainerInitializationContext context =
@@ -72,8 +88,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
 
   @Test
   public void testRemoveApplication() throws Exception {
-    PerNodeTimelineCollectorsAuxService auxService =
-        createCollectorAndAddApplication();
+    auxService = createCollectorAndAddApplication();
     // auxService should have a single app
     String appIdStr = appAttemptId.getApplicationId().toString();
     assertTrue(auxService.hasApplication(appIdStr));
@@ -90,8 +105,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
 
   @Test
   public void testRemoveApplicationNonAMContainer() throws Exception {
-    PerNodeTimelineCollectorsAuxService auxService =
-        createCollectorAndAddApplication();
+    auxService = createCollectorAndAddApplication();
     // auxService should have a single app
     String appIdStr = appAttemptId.getApplicationId().toString();
     assertTrue(auxService.hasApplication(appIdStr));
@@ -109,7 +123,6 @@ public class TestPerNodeTimelineCollectorsAuxService {
   @Test(timeout = 60000)
   public void testLaunch() throws Exception {
     ExitUtil.disableSystemExit();
-    PerNodeTimelineCollectorsAuxService auxService = null;
     try {
       auxService =
           PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
@@ -118,10 +131,6 @@ public class TestPerNodeTimelineCollectorsAuxService {
       assertEquals(0, e.status);
       ExitUtil.resetFirstExitException();
       fail();
-    } finally {
-      if (auxService != null) {
-        auxService.stop();
-      }
     }
   }
 
@@ -141,6 +150,8 @@ public class TestPerNodeTimelineCollectorsAuxService {
     TimelineCollectorManager collectorManager = createCollectorManager();
     PerNodeTimelineCollectorsAuxService auxService =
         spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
+    auxService.init(new YarnConfiguration());
+    auxService.start();
     return auxService;
   }
 
@@ -150,6 +161,14 @@ public class TestPerNodeTimelineCollectorsAuxService {
     doReturn(new Configuration()).when(collectorManager).getConfig();
     CollectorNodemanagerProtocol nmCollectorService =
         mock(CollectorNodemanagerProtocol.class);
+    GetTimelineCollectorContextResponse response =
+        GetTimelineCollectorContextResponse.newInstance(null, null, null);
+    try {
+      when(nmCollectorService.getTimelineCollectorContext(any(
+          GetTimelineCollectorContextRequest.class))).thenReturn(response);
+    } catch (YarnException | IOException e) {
+      fail();
+    }
     doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
     return collectorManager;
   }

+ 35 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java

@@ -20,10 +20,14 @@ package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -33,15 +37,34 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestTimelineCollectorManager {
+  private TimelineCollectorManager collectorManager;
+
+  @Before
+  public void setup() throws Exception {
+    collectorManager = createCollectorManager();
+    collectorManager.init(new YarnConfiguration());
+    collectorManager.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (collectorManager != null) {
+      collectorManager.stop();
+    }
+  }
 
   @Test(timeout=60000)
   public void testMultithreadedAdd() throws Exception {
-    final TimelineCollectorManager collectorManager = createCollectorManager();
-
     final int NUM_APPS = 5;
     List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
     for (int i = 0; i < NUM_APPS; i++) {
@@ -49,7 +72,7 @@ public class TestTimelineCollectorManager {
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollector(appId.toString());
+              new AppLevelTimelineCollector(appId);
           return (collectorManager.putIfAbsent(appId, collector) == collector);
         }
       };
@@ -73,8 +96,6 @@ public class TestTimelineCollectorManager {
 
   @Test
   public void testMultithreadedAddAndRemove() throws Exception {
-    final TimelineCollectorManager collectorManager = createCollectorManager();
-
     final int NUM_APPS = 5;
     List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
     for (int i = 0; i < NUM_APPS; i++) {
@@ -82,7 +103,7 @@ public class TestTimelineCollectorManager {
       Callable<Boolean> task = new Callable<Boolean>() {
         public Boolean call() {
           AppLevelTimelineCollector collector =
-              new AppLevelTimelineCollector(appId.toString());
+              new AppLevelTimelineCollector(appId);
           boolean successPut =
               (collectorManager.putIfAbsent(appId, collector) == collector);
           return successPut && collectorManager.remove(appId.toString());
@@ -112,6 +133,14 @@ public class TestTimelineCollectorManager {
     doReturn(new Configuration()).when(collectorManager).getConfig();
     CollectorNodemanagerProtocol nmCollectorService =
         mock(CollectorNodemanagerProtocol.class);
+    GetTimelineCollectorContextResponse response =
+        GetTimelineCollectorContextResponse.newInstance(null, null, null);
+    try {
+      when(nmCollectorService.getTimelineCollectorContext(any(
+          GetTimelineCollectorContextRequest.class))).thenReturn(response);
+    } catch (YarnException | IOException e) {
+      fail();
+    }
     doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
     return collectorManager;
   }

+ 15 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java

@@ -28,9 +28,9 @@ import java.nio.file.Paths;
 import java.util.List;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.Test;
 
@@ -52,13 +52,16 @@ public class TestFileSystemTimelineWriterImpl {
     entity.setModifiedTime(1425016502000L);
     te.addEntity(entity);
 
-    try (FileSystemTimelineWriterImpl fsi =
-        new FileSystemTimelineWriterImpl()) {
-      fsi.serviceInit(new Configuration());
-      fsi.write(te);
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
+      fsi.init(new YarnConfiguration());
+      fsi.start();
+      fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te);
 
-      String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
-          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      String fileName = fsi.getOutputRoot() +
+          "/entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type +
+          "/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
       Path path = Paths.get(fileName);
       File f = new File(fileName);
       assertTrue(f.exists() && !f.isDirectory());
@@ -73,6 +76,11 @@ public class TestFileSystemTimelineWriterImpl {
       File outputDir = new File(fsi.getOutputRoot());
       FileUtils.deleteDirectory(outputDir);
       assertTrue(!(f.exists()));
+    } finally {
+      if (fsi != null) {
+        fsi.stop();
+        FileUtils.deleteDirectory(new File(fsi.getOutputRoot()));
+      }
     }
   }
 }