浏览代码

YARN-9573. DistributedShell cannot specify LogAggregationContext. Contributed by Adam Antal.

Szilard Nemeth 5 年之前
父节点
当前提交
4216090f19

+ 34 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.util.UnitsConversionUtil;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -198,7 +200,9 @@ public class Client {
   private String nodeAttributeSpec = "";
   // log4j.properties file 
   // if available, add to local resources and set into classpath 
-  private String log4jPropFile = "";	
+  private String log4jPropFile = "";
+  // rolling
+  private String rollingFilesPattern = "";
 
   // Start time for client
   private final long clientStartTime = System.currentTimeMillis();
@@ -273,7 +277,7 @@ public class Client {
     }
     if (result) {
       LOG.info("Application completed successfully");
-      System.exit(0);			
+      System.exit(0);
     } 
     LOG.error("Application failed to complete successfully");
     System.exit(2);
@@ -337,6 +341,8 @@ public class Client {
     opts.addOption("enforce_execution_type", false,
         "Flag to indicate whether to enforce execution type of containers");
     opts.addOption("log_properties", true, "log4j.properties file");
+    opts.addOption("rolling_log_pattern", true,
+        "pattern for files that should be aggregated in a rolling fashion");
     opts.addOption("keep_containers_across_application_attempts", false,
         "Flag to indicate whether to keep containers across application "
             + "attempts."
@@ -434,6 +440,10 @@ public class Client {
       }
     }
 
+    if (cliParser.hasOption("rolling_log_pattern")) {
+      rollingFilesPattern = cliParser.getOptionValue("rolling_log_pattern");
+    }
+
     if (cliParser.hasOption("help")) {
       printUsage();
       return false;
@@ -479,7 +489,7 @@ public class Client {
 
     if (!cliParser.hasOption("jar")) {
       throw new IllegalArgumentException("No jar file specified for application master");
-    }		
+    }
 
     appMasterJar = cliParser.getOptionValue("jar");
 
@@ -669,16 +679,16 @@ public class Client {
         + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
         + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
         + ", queueApplicationCount=" + queueInfo.getApplications().size()
-        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());		
+        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
 
     List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
     for (QueueUserACLInfo aclInfo : listAclInfo) {
       for (QueueACL userAcl : aclInfo.getUserAcls()) {
         LOG.info("User ACL Info for Queue"
-            + ", queueName=" + aclInfo.getQueueName()			
+            + ", queueName=" + aclInfo.getQueueName()
             + ", userAcl=" + userAcl.name());
       }
-    }		
+    }
 
     if (domainId != null && domainId.length() > 0 && toCreateDomain) {
       prepareTimelineDomain();
@@ -775,7 +785,7 @@ public class Client {
 
     // set local resources for the application master
     // local files or archives as needed
-    // In this scenario, the jar file for the application master is part of the local resources			
+    // In this scenario, the jar file for the application master is part of the local resources
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
 
     LOG.info("Copy App Master jar from local filesystem and add to local environment");
@@ -796,7 +806,7 @@ public class Client {
     // To do this, we need to first copy into the filesystem that is visible 
     // to the yarn framework. 
     // We do not need to set this as a local resource for the application 
-    // master as the application master does not need it. 		
+    // master as the application master does not need it.
     String hdfsShellScriptLocation = ""; 
     long hdfsShellScriptLen = 0;
     long hdfsShellScriptTimestamp = 0;
@@ -840,7 +850,7 @@ public class Client {
       env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
     }
 
-    // Add AppMaster.jar location to classpath 		
+    // Add AppMaster.jar location to classpath
     // At some point we should not be required to add 
     // the hadoop specific classpaths to the env. 
     // It should be provided out of the box. 
@@ -938,7 +948,7 @@ public class Client {
 
     LOG.info("Completed setting up app master command " + command.toString());
     List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());		
+    commands.add(command.toString());
 
     // Set up the container launch context for the application master
     ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
@@ -999,6 +1009,8 @@ public class Client {
     // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue(amQueue);
 
+    specifyLogAggregationContext(appContext);
+
     // Submit the application to the applications manager
     // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
     // Ignore the response as either a valid response object is returned on success 
@@ -1016,6 +1028,15 @@ public class Client {
 
   }
 
+  @VisibleForTesting
+  void specifyLogAggregationContext(ApplicationSubmissionContext appContext) {
+    if (!rollingFilesPattern.isEmpty()) {
+      LogAggregationContext logAggregationContext = LogAggregationContext
+          .newInstance(null, null, rollingFilesPattern, "");
+      appContext.setLogAggregationContext(logAggregationContext);
+    }
+  }
+
   /**
    * Monitor the submitted application for completion. 
    * Kill application if time expires. 
@@ -1064,9 +1085,9 @@ public class Client {
               + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
               + ". Breaking monitoring loop");
           return false;
-        }			  
+        }
       }
-      else if (YarnApplicationState.KILLED == state	
+      else if (YarnApplicationState.KILLED == state
           || YarnApplicationState.FAILED == state) {
         LOG.info("Application did not finish."
             + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
@@ -1100,7 +1121,7 @@ public class Client {
 
     // Response can be ignored as it is non-null on success or 
     // throws an exception in case of failures
-    yarnClient.killApplication(appId);	
+    yarnClient.killApplication(appId);
   }
 
   private void addToLocalResources(FileSystem fs, String fileSrcPath,

+ 28 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -17,7 +17,8 @@
  */
 
 package org.apache.hadoop.yarn.applications.distributedshell;
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -60,12 +61,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@@ -95,6 +98,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -956,6 +960,29 @@ public class TestDistributedShell {
     Assert.assertTrue(LOG_AM.isDebugEnabled());
   }
 
+  @Test
+  public void testSpecifyingLogAggregationContext() throws Exception {
+    String regex = ".*(foo|bar)\\d";
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--shell_command",
+        "echo",
+        "--rolling_log_pattern",
+        regex
+    };
+    final Client client =
+        new Client(new Configuration(yarnCluster.getConfig()));
+    Assert.assertTrue(client.init(args));
+
+    ApplicationSubmissionContext context =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    client.specifyLogAggregationContext(context);
+    LogAggregationContext logContext = context.getLogAggregationContext();
+    assertEquals(logContext.getRolledLogsIncludePattern(), regex);
+    assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
+  }
+
   public void testDSShellWithCommands() throws Exception {
 
     String[] args = {