瀏覽代碼

YARN-3461. Consolidate flow name/version/run defaults. (Sangjin Lee via Varun Saxena)

Varun Saxena 9 年之前
父節點
當前提交
a3cf40e532
共有 9 個文件被更改,包括 148 次插入75 次删除
  1. 24 22
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
  2. 11 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
  3. 6 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
  4. 51 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  5. 31 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
  6. 8 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
  7. 2 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
  8. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
  9. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java

+ 24 - 22
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java

@@ -20,15 +20,12 @@ package org.apache.hadoop.mapred;
 
 
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
-
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.List;
-import java.util.Set;
 
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -38,9 +35,9 @@ import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -48,7 +45,6 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -205,7 +201,7 @@ public class TestMRTimelineEventHandling {
       ApplicationReport appReport = apps.get(0);
       ApplicationReport appReport = apps.get(0);
       firstAppId = appReport.getApplicationId();
       firstAppId = appReport.getApplicationId();
 
 
-      checkNewTimelineEvent(firstAppId);
+      checkNewTimelineEvent(firstAppId, appReport);
 
 
       LOG.info("Run 2nd job which should be failed.");
       LOG.info("Run 2nd job which should be failed.");
       job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
       job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
@@ -214,11 +210,10 @@ public class TestMRTimelineEventHandling {
       
       
       apps = yarnClient.getApplications(appStates);
       apps = yarnClient.getApplications(appStates);
       Assert.assertEquals(apps.size(), 2);
       Assert.assertEquals(apps.size(), 2);
-      
-      ApplicationId secAppId = null;
-      secAppId = apps.get(0).getApplicationId() == firstAppId ? 
-          apps.get(1).getApplicationId() : apps.get(0).getApplicationId();
-      checkNewTimelineEvent(firstAppId);
+
+      appReport = apps.get(0).getApplicationId().equals(firstAppId) ?
+          apps.get(0) : apps.get(1);
+      checkNewTimelineEvent(firstAppId, appReport);
 
 
     } finally {
     } finally {
       if (cluster != null) {
       if (cluster != null) {
@@ -235,7 +230,8 @@ public class TestMRTimelineEventHandling {
     }
     }
   }
   }
   
   
-  private void checkNewTimelineEvent(ApplicationId appId) throws IOException {
+  private void checkNewTimelineEvent(ApplicationId appId,
+      ApplicationReport appReport) throws IOException {
     String tmpRoot =
     String tmpRoot =
         FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
         FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
             + "/entities/";
             + "/entities/";
@@ -243,15 +239,18 @@ public class TestMRTimelineEventHandling {
     File tmpRootFolder = new File(tmpRoot);
     File tmpRootFolder = new File(tmpRoot);
     
     
     Assert.assertTrue(tmpRootFolder.isDirectory());
     Assert.assertTrue(tmpRootFolder.isDirectory());
-    String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
-        UserGroupInformation.getCurrentUser().getShortUserName() +
-        "/" + TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) +
-        "/1/1/" + appId.toString();
+    String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID +
+        "/" + UserGroupInformation.getCurrentUser().getShortUserName() +
+        "/" + appReport.getName() +
+        "/" + TimelineUtils.DEFAULT_FLOW_VERSION +
+        "/" + appReport.getStartTime() +
+        "/" + appId.toString();
     // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
     // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
     String outputDirJob = basePath + "/MAPREDUCE_JOB/";
     String outputDirJob = basePath + "/MAPREDUCE_JOB/";
 
 
     File entityFolder = new File(outputDirJob);
     File entityFolder = new File(outputDirJob);
-    Assert.assertTrue("Job output directory: " + outputDirJob + " is not exist.",
+    Assert.assertTrue("Job output directory: " + outputDirJob +
+        " does not exist.",
         entityFolder.isDirectory());
         entityFolder.isDirectory());
 
 
     // check for job event file
     // check for job event file
@@ -260,13 +259,15 @@ public class TestMRTimelineEventHandling {
 
 
     String jobEventFilePath = outputDirJob + jobEventFileName;
     String jobEventFilePath = outputDirJob + jobEventFileName;
     File jobEventFile = new File(jobEventFilePath);
     File jobEventFile = new File(jobEventFilePath);
-    Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not exist.",
+    Assert.assertTrue("jobEventFilePath: " + jobEventFilePath +
+        " does not exist.",
         jobEventFile.exists());
         jobEventFile.exists());
 
 
     // check for task event file
     // check for task event file
     String outputDirTask = basePath + "/MAPREDUCE_TASK/";
     String outputDirTask = basePath + "/MAPREDUCE_TASK/";
     File taskFolder = new File(outputDirTask);
     File taskFolder = new File(outputDirTask);
-    Assert.assertTrue("Task output directory: " + outputDirTask + " is not exist.",
+    Assert.assertTrue("Task output directory: " + outputDirTask +
+        " does not exist.",
         taskFolder.isDirectory());
         taskFolder.isDirectory());
     
     
     String taskEventFileName = appId.toString().replaceAll("application", "task")
     String taskEventFileName = appId.toString().replaceAll("application", "task")
@@ -274,14 +275,15 @@ public class TestMRTimelineEventHandling {
 
 
     String taskEventFilePath = outputDirTask + taskEventFileName;
     String taskEventFilePath = outputDirTask + taskEventFileName;
     File taskEventFile = new File(taskEventFilePath);
     File taskEventFile = new File(taskEventFilePath);
-    Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not exist.",
+    Assert.assertTrue("taskEventFileName: " + taskEventFilePath +
+        " does not exist.",
         taskEventFile.exists());
         taskEventFile.exists());
     
     
     // check for task attempt event file
     // check for task attempt event file
     String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
     String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
     File taskAttemptFolder = new File(outputDirTaskAttempt);
     File taskAttemptFolder = new File(outputDirTaskAttempt);
     Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + 
     Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + 
-        " is not exist.", taskAttemptFolder.isDirectory());
+        " does not exist.", taskAttemptFolder.isDirectory());
     
     
     String taskAttemptEventFileName = appId.toString().replaceAll(
     String taskAttemptEventFileName = appId.toString().replaceAll(
         "application", "attempt") + "_m_000000_0" + 
         "application", "attempt") + "_m_000000_0" + 
@@ -291,7 +293,7 @@ public class TestMRTimelineEventHandling {
         taskAttemptEventFileName;
         taskAttemptEventFileName;
     File taskAttemptEventFile = new File(taskAttemptEventFilePath);
     File taskAttemptEventFile = new File(taskAttemptEventFilePath);
     Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
     Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
-        " is not exist.", taskAttemptEventFile.exists());
+        " does not exist.", taskAttemptEventFile.exists());
   }
   }
 
 
   @Test
   @Test

+ 11 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -372,13 +372,14 @@ public class TestDistributedShell {
     boolean verified = false;
     boolean verified = false;
     String errorMessage = "";
     String errorMessage = "";
     ApplicationId appId = null;
     ApplicationId appId = null;
+    ApplicationReport appReport = null;
     while(!verified) {
     while(!verified) {
       List<ApplicationReport> apps = yarnClient.getApplications();
       List<ApplicationReport> apps = yarnClient.getApplications();
       if (apps.size() == 0 ) {
       if (apps.size() == 0 ) {
         Thread.sleep(10);
         Thread.sleep(10);
         continue;
         continue;
       }
       }
-      ApplicationReport appReport = apps.get(0);
+      appReport = apps.get(0);
       appId = appReport.getApplicationId();
       appId = appReport.getApplicationId();
       if(appReport.getHost().equals("N/A")) {
       if(appReport.getHost().equals("N/A")) {
         Thread.sleep(10);
         Thread.sleep(10);
@@ -424,7 +425,7 @@ public class TestDistributedShell {
     if (!isTestingTimelineV2) {
     if (!isTestingTimelineV2) {
       checkTimelineV1(haveDomain);
       checkTimelineV1(haveDomain);
     } else {
     } else {
-      checkTimelineV2(haveDomain, appId, defaultFlow);
+      checkTimelineV2(haveDomain, appId, defaultFlow, appReport);
     }
     }
   }
   }
 
 
@@ -481,7 +482,7 @@ public class TestDistributedShell {
   }
   }
 
 
   private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
   private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
-      boolean defaultFlow) throws Exception {
+      boolean defaultFlow, ApplicationReport appReport) throws Exception {
     LOG.info("Started checkTimelineV2 ");
     LOG.info("Started checkTimelineV2 ");
     // For PoC check in /tmp/timeline_service_data YARN-3264
     // For PoC check in /tmp/timeline_service_data YARN-3264
     String tmpRoot =
     String tmpRoot =
@@ -494,10 +495,13 @@ public class TestDistributedShell {
       String basePath = tmpRoot +
       String basePath = tmpRoot +
           YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
           YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
           UserGroupInformation.getCurrentUser().getShortUserName() +
           UserGroupInformation.getCurrentUser().getShortUserName() +
-          (defaultFlow ? "/" +
-              TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) +
-              "/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
-              appId.toString();
+          (defaultFlow ?
+              "/" + appReport.getName() + "/" +
+                  TimelineUtils.DEFAULT_FLOW_VERSION +"/" +
+                  appReport.getStartTime() +"/" :
+              "/test_flow_name/test_flow_version/12345678/") +
+          appId.toString();
+      LOG.info("basePath: " + basePath);
       // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
       // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
 
 
       // Verify DS_APP_ATTEMPT entities posted by the client
       // Verify DS_APP_ATTEMPT entities posted by the client

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java

@@ -49,6 +49,7 @@ public class TimelineUtils {
       "TIMELINE_FLOW_VERSION_TAG";
       "TIMELINE_FLOW_VERSION_TAG";
   public static final String FLOW_RUN_ID_TAG_PREFIX =
   public static final String FLOW_RUN_ID_TAG_PREFIX =
       "TIMELINE_FLOW_RUN_ID_TAG";
       "TIMELINE_FLOW_RUN_ID_TAG";
+  public final static String DEFAULT_FLOW_VERSION = "1";
 
 
   private static ObjectMapper mapper;
   private static ObjectMapper mapper;
 
 
@@ -162,9 +163,12 @@ public class TimelineUtils {
     return SecurityUtil.buildTokenService(timelineServiceAddr);
     return SecurityUtil.buildTokenService(timelineServiceAddr);
   }
   }
 
 
-  public static String generateDefaultFlowNameBasedOnAppId(
+  public static String generateDefaultFlowName(String appName,
       ApplicationId appId) {
       ApplicationId appId) {
-    return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
+    return (appName != null &&
+        !appName.equals(YarnConfiguration.DEFAULT_APPLICATION_NAME)) ?
+        appName :
+        "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
   }
   }
 
 
   /**
   /**

+ 51 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -198,6 +199,8 @@ public class AMLauncher implements Runnable {
 
 
     // Finalize the container
     // Finalize the container
     setupTokens(container, containerID);
     setupTokens(container, containerID);
+    // set the flow context optionally for timeline service v.2
+    setFlowContext(container);
 
 
     return container;
     return container;
   }
   }
@@ -229,15 +232,6 @@ public class AMLauncher implements Runnable {
             .get(applicationId)
             .get(applicationId)
             .getSubmitTime()));
             .getSubmitTime()));
 
 
-    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
-      // Set flow context info
-      for (String tag :
-          rmContext.getRMApps().get(applicationId).getApplicationTags()) {
-        setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
-        setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
-        setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
-      }
-    }
     Credentials credentials = new Credentials();
     Credentials credentials = new Credentials();
     DataInputByteBuffer dibb = new DataInputByteBuffer();
     DataInputByteBuffer dibb = new DataInputByteBuffer();
     ByteBuffer tokens = container.getTokens();
     ByteBuffer tokens = container.getTokens();
@@ -258,17 +252,58 @@ public class AMLauncher implements Runnable {
     container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
     container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
   }
   }
 
 
-  private static void setFlowTags(
-      Map<String, String> environment, String tagPrefix, String tag) {
-    if (tag.startsWith(tagPrefix + ":") ||
-        tag.startsWith(tagPrefix.toLowerCase() + ":")) {
-      String value = tag.substring(tagPrefix.length() + 1);
-      if (!value.isEmpty()) {
-        environment.put(tagPrefix, value);
+  private void setFlowContext(ContainerLaunchContext container) {
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      Map<String, String> environment = container.getEnvironment();
+      ApplicationId applicationId =
+          application.getAppAttemptId().getApplicationId();
+      RMApp app = rmContext.getRMApps().get(applicationId);
+
+      // initialize the flow in the environment with default values for those
+      // that do not specify the flow tags
+      // flow name: app name (or app id if app name is missing),
+      // flow version: "1", flow run id: start time
+      setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+          TimelineUtils.generateDefaultFlowName(app.getName(), applicationId));
+      setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+          TimelineUtils.DEFAULT_FLOW_VERSION);
+      setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+          String.valueOf(app.getStartTime()));
+
+      // Set flow context info: the flow context is received via the application
+      // tags
+      for (String tag : app.getApplicationTags()) {
+        String[] parts = tag.split(":", 2);
+        if (parts.length != 2 || parts[1].isEmpty()) {
+          continue;
+        }
+        switch (parts[0].toUpperCase()) {
+        case TimelineUtils.FLOW_NAME_TAG_PREFIX:
+          setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+              parts[1]);
+          break;
+        case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
+          setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+              parts[1]);
+          break;
+        case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
+          setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+              parts[1]);
+          break;
+        default:
+          break;
+        }
       }
       }
     }
     }
   }
   }
 
 
+  private static void setFlowTags(
+      Map<String, String> environment, String tagPrefix, String value) {
+    if (!value.isEmpty()) {
+      environment.put(tagPrefix, value);
+    }
+  }
+
   @VisibleForTesting
   @VisibleForTesting
   protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
   protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
     Token<AMRMTokenIdentifier> amrmToken =
     Token<AMRMTokenIdentifier> amrmToken =

+ 31 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java

@@ -18,6 +18,8 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
 package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -25,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 
@@ -35,6 +38,9 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 @InterfaceStability.Unstable
 public class RMTimelineCollectorManager extends TimelineCollectorManager {
 public class RMTimelineCollectorManager extends TimelineCollectorManager {
+  private static final Log LOG =
+      LogFactory.getLog(RMTimelineCollectorManager.class);
+
   private RMContext rmContext;
   private RMContext rmContext;
 
 
   public RMTimelineCollectorManager(RMContext rmContext) {
   public RMTimelineCollectorManager(RMContext rmContext) {
@@ -51,9 +57,21 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
           "non-existing app " + appId);
           "non-existing app " + appId);
     }
     }
     String userId = app.getUser();
     String userId = app.getUser();
+    TimelineCollectorContext context = collector.getTimelineEntityContext();
     if (userId != null && !userId.isEmpty()) {
     if (userId != null && !userId.isEmpty()) {
-      collector.getTimelineEntityContext().setUserId(userId);
+      context.setUserId(userId);
     }
     }
+
+    // initialize the flow in the environment with default values for those
+    // that do not specify the flow tags
+    // flow name: app name (or app id if app name is missing),
+    // flow version: "1", flow run id: start time
+    context.setFlowName(TimelineUtils.generateDefaultFlowName(
+        app.getName(), appId));
+    context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION);
+    context.setFlowRunId(app.getStartTime());
+
+    // the flow context is received via the application tags
     for (String tag : app.getApplicationTags()) {
     for (String tag : app.getApplicationTags()) {
       String[] parts = tag.split(":", 2);
       String[] parts = tag.split(":", 2);
       if (parts.length != 2 || parts[1].isEmpty()) {
       if (parts.length != 2 || parts[1].isEmpty()) {
@@ -61,14 +79,22 @@ public class RMTimelineCollectorManager extends TimelineCollectorManager {
       }
       }
       switch (parts[0].toUpperCase()) {
       switch (parts[0].toUpperCase()) {
       case TimelineUtils.FLOW_NAME_TAG_PREFIX:
       case TimelineUtils.FLOW_NAME_TAG_PREFIX:
-        collector.getTimelineEntityContext().setFlowName(parts[1]);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow name: " + parts[1]);
+        }
+        context.setFlowName(parts[1]);
         break;
         break;
       case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
       case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
-        collector.getTimelineEntityContext().setFlowVersion(parts[1]);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow version: " + parts[1]);
+        }
+        context.setFlowVersion(parts[1]);
         break;
         break;
       case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
       case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
-        collector.getTimelineEntityContext().setFlowRunId(
-            Long.parseLong(parts[1]));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow run id: " + parts[1]);
+        }
+        context.setFlowRunId(Long.parseLong(parts[1]));
         break;
         break;
       default:
       default:
         break;
         break;

+ 8 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java

@@ -83,8 +83,6 @@ public class TestSystemMetricsPublisherForV2 {
 
 
   private static TimelineServiceV2Publisher metricsPublisher;
   private static TimelineServiceV2Publisher metricsPublisher;
   private static DrainDispatcher dispatcher = new DrainDispatcher();
   private static DrainDispatcher dispatcher = new DrainDispatcher();
-  private static final String DEFAULT_FLOW_VERSION = "1";
-  private static final long DEFAULT_FLOW_RUN = 1;
 
 
   private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
   private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
 
 
@@ -316,16 +314,14 @@ public class TestSystemMetricsPublisherForV2 {
 
 
   private String getTimelineEntityDir(RMApp app) {
   private String getTimelineEntityDir(RMApp app) {
     String outputDirApp =
     String outputDirApp =
-        testRootDir.getAbsolutePath()+"/"
-            + FileSystemTimelineWriterImpl.ENTITIES_DIR
-            + "/"
-            + YarnConfiguration.DEFAULT_RM_CLUSTER_ID
-            + "/"
-            + app.getUser()
-            + "/"
-            + TimelineUtils.generateDefaultFlowNameBasedOnAppId(app
-                .getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/"
-            + DEFAULT_FLOW_RUN + "/" + app.getApplicationId();
+        testRootDir.getAbsolutePath() + "/"
+            + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+            + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+            + app.getUser() + "/"
+            + app.getName() + "/"
+            + TimelineUtils.DEFAULT_FLOW_VERSION + "/"
+            + app.getStartTime() + "/"
+            + app.getApplicationId();
     return outputDirApp;
     return outputDirApp;
   }
   }
 
 

+ 2 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java

@@ -18,14 +18,14 @@
 
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+import com.google.common.base.Preconditions;
 
 
 /**
 /**
  * Service that handles writes to the timeline service and writes them to the
  * Service that handles writes to the timeline service and writes them to the
@@ -54,13 +54,6 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     // context info from NM.
     // context info from NM.
     // Current user usually is not the app user, but keep this field non-null
     // Current user usually is not the app user, but keep this field non-null
     context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
     context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
-    // Use app ID to generate a default flow name for orphan app
-    context.setFlowName(
-        TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId));
-    // Set the flow version to string 1 if it's an orphan app
-    context.setFlowVersion("1");
-    // Set the flow run ID to 1 if it's an orphan app
-    context.setFlowRunId(1L);
     context.setAppId(appId.toString());
     context.setAppId(appId.toString());
     super.serviceInit(conf);
     super.serviceInit(conf);
   }
   }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java

@@ -164,18 +164,30 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
         getNMCollectorService().getTimelineCollectorContext(request);
         getNMCollectorService().getTimelineCollectorContext(request);
     String userId = response.getUserId();
     String userId = response.getUserId();
     if (userId != null && !userId.isEmpty()) {
     if (userId != null && !userId.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the user in the context: " + userId);
+      }
       collector.getTimelineEntityContext().setUserId(userId);
       collector.getTimelineEntityContext().setUserId(userId);
     }
     }
     String flowName = response.getFlowName();
     String flowName = response.getFlowName();
     if (flowName != null && !flowName.isEmpty()) {
     if (flowName != null && !flowName.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow name: " + flowName);
+      }
       collector.getTimelineEntityContext().setFlowName(flowName);
       collector.getTimelineEntityContext().setFlowName(flowName);
     }
     }
     String flowVersion = response.getFlowVersion();
     String flowVersion = response.getFlowVersion();
     if (flowVersion != null && !flowVersion.isEmpty()) {
     if (flowVersion != null && !flowVersion.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow version: " + flowVersion);
+      }
       collector.getTimelineEntityContext().setFlowVersion(flowVersion);
       collector.getTimelineEntityContext().setFlowVersion(flowVersion);
     }
     }
     long flowRunId = response.getFlowRunId();
     long flowRunId = response.getFlowRunId();
     if (flowRunId != 0L) {
     if (flowRunId != 0L) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow run id: " + flowRunId);
+      }
       collector.getTimelineEntityContext().setFlowRunId(flowRunId);
       collector.getTimelineEntityContext().setFlowRunId(flowRunId);
     }
     }
   }
   }

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java

@@ -19,12 +19,12 @@
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 
 import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
 import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 
 /**
 /**
  * Encapsulates context information required by collector during a put.
  * Encapsulates context information required by collector during a put.
  */
  */
 public class TimelineCollectorContext extends TimelineContext {
 public class TimelineCollectorContext extends TimelineContext {
-
   private String flowVersion;
   private String flowVersion;
 
 
   public TimelineCollectorContext() {
   public TimelineCollectorContext() {
@@ -34,7 +34,8 @@ public class TimelineCollectorContext extends TimelineContext {
   public TimelineCollectorContext(String clusterId, String userId,
   public TimelineCollectorContext(String clusterId, String userId,
       String flowName, String flowVersion, Long flowRunId, String appId) {
       String flowName, String flowVersion, Long flowRunId, String appId) {
     super(clusterId, userId, flowName, flowRunId, appId);
     super(clusterId, userId, flowName, flowRunId, appId);
-    this.flowVersion = flowVersion;
+    this.flowVersion = flowVersion == null ?
+        TimelineUtils.DEFAULT_FLOW_VERSION : flowVersion;
   }
   }
 
 
   @Override
   @Override