Browse Source

YARN-3125. Made the distributed shell use timeline service next gen and add an integration test for it. Contributed by Junping Du and Li Lu.

(cherry picked from commit bf08f7f0ed4900ce52f98137297dd1a47ba2a536)
Zhijie Shen 10 years ago
parent
commit
0b9b25de43

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -20,6 +20,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3087. Made the REST server of per-node aggregator work alone in NM
     daemon. (Li Lu via zjshen)
 
+    YARN-3125. Made the distributed shell use timeline service next gen and
+    add an integration test for it. (Junping Du and Li Lu via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

+ 158 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -212,6 +212,8 @@ public class ApplicationMaster {
   private int appMasterRpcPort = -1;
   // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
+  
+  private boolean newTimelineService = false;
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -372,7 +374,8 @@ public class ApplicationMaster {
         "No. of containers on which the shell command needs to be executed");
     opts.addOption("priority", true, "Application Priority. Default 0");
     opts.addOption("debug", false, "Dump out debug information");
-
+    opts.addOption("timeline_service_version", true, 
+        "Version for timeline service");
     opts.addOption("help", false, "Print usage");
     CommandLine cliParser = new GnuParser().parse(opts, args);
 
@@ -508,6 +511,30 @@ public class ApplicationMaster {
     }
     requestPriority = Integer.parseInt(cliParser
         .getOptionValue("priority", "0"));
+
+    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+      if (cliParser.hasOption("timeline_service_version")) {
+        String timelineServiceVersion = 
+            cliParser.getOptionValue("timeline_service_version", "v1");
+        if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
+          newTimelineService = false;
+        } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) {
+          newTimelineService = true;
+        } else {
+          throw new IllegalArgumentException(
+              "timeline_service_version is not set properly, should be 'v1' or 'v2'");
+        }
+      }
+    } else {
+      timelineClient = null;
+      LOG.warn("Timeline service is not enabled");
+      if (cliParser.hasOption("timeline_service_version")) {
+        throw new IllegalArgumentException(
+            "Timeline service is not enabled");
+      }
+    }
+
     return true;
   }
 
@@ -555,7 +582,6 @@ public class ApplicationMaster {
         UserGroupInformation.createRemoteUser(appSubmitterUserName);
     appSubmitterUgi.addCredentials(credentials);
 
-
     AMRMClientAsync.AbstractCallbackHandler allocListener =
         new RMCallbackHandler();
     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
@@ -569,8 +595,14 @@ public class ApplicationMaster {
 
     startTimelineClient(conf);
     if(timelineClient != null) {
-      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
+      if (newTimelineService) {
+        publishApplicationAttemptEventOnNewTimelineService(timelineClient, 
+            appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId, 
+            appSubmitterUgi);
+      } else {
+        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
+      }
     }
 
     // Setup local RPC Server to accept status requests directly from clients
@@ -673,9 +705,15 @@ public class ApplicationMaster {
       } catch (InterruptedException ex) {}
     }
 
-    if(timelineClient != null) {
-      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+    if (timelineClient != null) {
+      if (newTimelineService) {
+        publishApplicationAttemptEventOnNewTimelineService(timelineClient,
+          appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId,
+          appSubmitterUgi);
+      } else {
+        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
           DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+      }
     }
 
     // Join all launched threads
@@ -781,8 +819,13 @@ public class ApplicationMaster {
               + containerStatus.getContainerId());
         }
         if(timelineClient != null) {
-          publishContainerEndEvent(
-              timelineClient, containerStatus, domainId, appSubmitterUgi);
+          if (newTimelineService) {
+            publishContainerEndEventOnNewTimelineService(
+                timelineClient, containerStatus, domainId, appSubmitterUgi);
+          } else {
+            publishContainerEndEvent(
+                timelineClient, containerStatus, domainId, appSubmitterUgi);
+          }
         }
       }
       
@@ -904,9 +947,15 @@ public class ApplicationMaster {
         applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
       }
       if(applicationMaster.timelineClient != null) {
-        ApplicationMaster.publishContainerStartEvent(
-            applicationMaster.timelineClient, container,
-            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+        if (applicationMaster.newTimelineService) {
+            ApplicationMaster.publishContainerStartEventOnNewTimelineService(
+                applicationMaster.timelineClient, container,
+                applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+        } else {
+          ApplicationMaster.publishContainerStartEvent(
+              applicationMaster.timelineClient, container,
+              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+        }
       }
     }
 
@@ -1240,4 +1289,102 @@ public class ApplicationMaster {
             shellId);
     return new Thread(runnableLaunchContainer);
   }
+  
+  private static void publishContainerStartEventOnNewTimelineService(
+      final TimelineClient timelineClient, Container container, String domainId,
+      UserGroupInformation ugi) {
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
+    entity.setId(container.getId().toString());
+    entity.setType(DSEntity.DS_CONTAINER.toString());
+    //entity.setDomainId(domainId);
+    entity.addInfo("user", ugi.getShortUserName());
+    
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = 
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
+    event.setTimestamp(System.currentTimeMillis());
+    event.setId(DSEvent.DS_CONTAINER_START.toString());
+    event.addInfo("Node", container.getNodeId().toString());
+    event.addInfo("Resources", container.getResource().toString());
+    entity.addEvent(event);
+
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public TimelinePutResponse run() throws Exception {
+          timelineClient.putEntities(entity);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Container start event could not be published for "
+          + container.getId().toString(),
+          e instanceof UndeclaredThrowableException ? e.getCause() : e);
+    }
+  }
+
+  private static void publishContainerEndEventOnNewTimelineService(
+      final TimelineClient timelineClient, ContainerStatus container,
+      String domainId, UserGroupInformation ugi) {
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
+    entity.setId(container.getContainerId().toString());
+    entity.setType(DSEntity.DS_CONTAINER.toString());
+    //entity.setDomainId(domainId);
+    entity.addInfo("user", ugi.getShortUserName());
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = 
+        new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
+    event.setTimestamp(System.currentTimeMillis());
+    event.setId(DSEvent.DS_CONTAINER_END.toString());
+    event.addInfo("State", container.getState().name());
+    event.addInfo("Exit Status", container.getExitStatus());
+    entity.addEvent(event);
+
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public TimelinePutResponse run() throws Exception {
+          timelineClient.putEntities(entity);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Container end event could not be published for "
+          + container.getContainerId().toString(),
+          e instanceof UndeclaredThrowableException ? e.getCause() : e);
+    }
+  }
+
+  private static void publishApplicationAttemptEventOnNewTimelineService(
+      final TimelineClient timelineClient, String appAttemptId,
+      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
+    entity.setId(appAttemptId);
+    entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
+    //entity.setDomainId(domainId);
+    entity.addInfo("user", ugi.getShortUserName());
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = 
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
+    event.setId(appEvent.toString());
+    event.setTimestamp(System.currentTimeMillis());
+    entity.addEvent(event);
+
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public TimelinePutResponse run() throws Exception {
+          timelineClient.putEntities(entity);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("App Attempt "
+          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+          + " event could not be published for "
+          + appAttemptId.toString(),
+          e instanceof UndeclaredThrowableException ? e.getCause() : e);
+    }
+  }
+
 }

+ 18 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -185,6 +185,8 @@ public class Client {
 
   // Command line options
   private Options opts;
+  
+  private String timelineServiceVersion;
 
   private static final String shellCommandPath = "shellCommands";
   private static final String shellArgsPath = "shellArgs";
@@ -260,6 +262,7 @@ public class Client {
     opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
     opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
     opts.addOption("log_properties", true, "log4j.properties file");
+    opts.addOption("timeline_service_version", true, "Version for timeline service");
     opts.addOption("keep_containers_across_application_attempts", false,
       "Flag to indicate whether to keep containers across application attempts." +
       " If the flag is true, running containers will not be killed when" +
@@ -354,6 +357,16 @@ public class Client {
       throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
           + " Specified virtual cores=" + amVCores);
     }
+    
+    if (cliParser.hasOption("timeline_service_version")) {
+      timelineServiceVersion = 
+        cliParser.getOptionValue("timeline_service_version", "v1");
+      if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") || 
+          timelineServiceVersion.trim().equalsIgnoreCase("v2"))) {
+        throw new IllegalArgumentException(
+              "timeline_service_version is not set properly, should be 'v1' or 'v2'");
+      }
+    }
 
     if (!cliParser.hasOption("jar")) {
       throw new IllegalArgumentException("No jar file specified for application master");
@@ -633,11 +646,14 @@ public class Client {
 
     for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
       vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
-    }			
+    }
     if (debugFlag) {
       vargs.add("--debug");
     }
 
+    if (timelineServiceVersion != null) {
+      vargs.add("--timeline_service_version " + timelineServiceVersion);
+    }
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
 
@@ -647,7 +663,7 @@ public class Client {
       command.append(str).append(" ");
     }
 
-    LOG.info("Completed setting up app master command " + command.toString());	   
+    LOG.info("Completed setting up app master command " + command.toString());
     List<String> commands = new ArrayList<String>();
     commands.add(command.toString());		
 

+ 69 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -50,10 +50,13 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 
 public class TestDistributedShell {
 
@@ -63,10 +66,14 @@ public class TestDistributedShell {
   protected MiniYARNCluster yarnCluster = null;  
   protected YarnConfiguration conf = null;
   private static final int NUM_NMS = 1;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_aggregator";
 
   protected final static String APPMASTER_JAR =
       JarFinder.getJar(ApplicationMaster.class);
 
+  @Rule
+  public TestName currTestName= new TestName();
+
   @Before
   public void setup() throws Exception {
     setupInternal(NUM_NMS);
@@ -75,20 +82,32 @@ public class TestDistributedShell {
   protected void setupInternal(int numNodeManager) throws Exception {
 
     LOG.info("Starting up YARN cluster");
-    
+
     conf = new YarnConfiguration();
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
     conf.set("yarn.log.dir", "target");
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // mark if we need to launch the v1 timeline server
+    boolean enableATSV1 = false;
+    if (!currTestName.getMethodName().toLowerCase().contains("v2")) {
+      // disable aux-service based timeline aggregators
+      conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
+      enableATSV1 = true;
+    } else {
+      // enable aux-service based timeline aggregators
+      conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
+      conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
+        + ".class", PerNodeAggregatorServer.class.getName());
+    }
     conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     conf.set("mapreduce.jobhistory.address",
         "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
-    
+
     if (yarnCluster == null) {
       yarnCluster =
           new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
-              numNodeManager, 1, 1);
+              numNodeManager, 1, 1, enableATSV1);
       yarnCluster.init(conf);
       
       yarnCluster.start();
@@ -148,15 +167,21 @@ public class TestDistributedShell {
   
   @Test(timeout=90000)
   public void testDSShellWithDomain() throws Exception {
-    testDSShell(true);
+    testDSShell(true, "v1");
   }
 
   @Test(timeout=90000)
   public void testDSShellWithoutDomain() throws Exception {
-    testDSShell(false);
+    testDSShell(false, "v1");
   }
 
-  public void testDSShell(boolean haveDomain) throws Exception {
+  @Test(timeout=90000)
+  public void testDSShellWithoutDomainV2() throws Exception {
+    testDSShell(false, "v2");
+  }
+
+  public void testDSShell(boolean haveDomain, String timelineVersion)
+      throws Exception {
     String[] args = {
         "--jar",
         APPMASTER_JAR,
@@ -183,9 +208,17 @@ public class TestDistributedShell {
           "writer_user writer_group",
           "--create"
       };
-      List<String> argsList = new ArrayList<String>(Arrays.asList(args));
-      argsList.addAll(Arrays.asList(domainArgs));
-      args = argsList.toArray(new String[argsList.size()]);
+      args = mergeArgs(args, domainArgs);
+    }
+    boolean isTestingTimelineV2 = false;
+    if (timelineVersion.equalsIgnoreCase("v2")) {
+      String[] timelineArgs = {
+          "--timeline_service_version",
+          "v2"
+      };
+      isTestingTimelineV2 = true;
+      args = mergeArgs(args, timelineArgs);
+      LOG.info("Setup: Using timeline v2!");
     }
 
     LOG.info("Initializing DS Client");
@@ -239,7 +272,15 @@ public class TestDistributedShell {
     LOG.info("Client run completed. Result=" + result);
     Assert.assertTrue(result.get());
 
-    TimelineDomain domain = null;
+    if (!isTestingTimelineV2) {
+      checkTimelineV1(haveDomain);
+    } else {
+      checkTimelineV2(haveDomain);
+    }
+  }
+
+  private void checkTimelineV1(boolean haveDomain) throws Exception {
+        TimelineDomain domain = null;
     if (haveDomain) {
       domain = yarnCluster.getApplicationHistoryServer()
           .getTimelineStore().getDomain("TEST_DOMAIN");
@@ -283,6 +324,24 @@ public class TestDistributedShell {
     }
   }
 
+  private void checkTimelineV2(boolean haveDomain) {
+    // TODO check timeline V2 here after we have a storage layer
+  }
+
+  /**
+   * Utility function to merge two String arrays to form a new String array for
+   * our argumemts.
+   *
+   * @param args
+   * @param newArgs
+   * @return a String array consists of {args, newArgs}
+   */
+  private String[] mergeArgs(String[] args, String[] newArgs) {
+    List<String> argsList = new ArrayList<String>(Arrays.asList(args));
+    argsList.addAll(Arrays.asList(newArgs));
+    return argsList.toArray(new String[argsList.size()]);
+  }
+
   /*
    * NetUtils.getHostname() returns a string in the form "hostname/ip".
    * Sometimes the hostname we get is the FQDN and sometimes the short name. In

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java

@@ -72,6 +72,12 @@ public class BaseAggregatorService extends CompositeService {
    */
   public void postEntities(TimelineEntities entities,
       UserGroupInformation callerUgi) {
+    // Add this output temporarily for our prototype
+    // TODO remove this after we have an actual implementation
+    LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
+    LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
+        callerUgi + ")");
+
     // TODO implement
     if (LOG.isDebugEnabled()) {
       LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java

@@ -118,8 +118,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
           .setConf(conf)
           .addEndpoint(URI.create("http://" + bindAddress));
       timelineRestServer = builder.build();
-      // TODO: replace this by an authentification filter in future.
-      HashMap<String, String> options = new HashMap<String, String>();
+      // TODO: replace this by an authentication filter in future.
+      HashMap<String, String> options = new HashMap<>();
       String username = conf.get(HADOOP_HTTP_STATIC_USER,
           DEFAULT_HADOOP_HTTP_STATIC_USER);
       options.put(HADOOP_HTTP_STATIC_USER, username);