소스 검색

YARN-5141. Get Container logs for the Running application from Yarn Logs CommandLine. Contributed by Xuan Gong.

Varun Vasudev 9 년 전
부모
커밋
4e1f56e111

+ 60 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java

@@ -40,12 +40,14 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -359,7 +361,9 @@ public class LogsCLI extends Configured implements Tool {
     return logFiles;
   }
 
-  private void printContainerLogsFromRunningApplication(Configuration conf,
+  @Private
+  @VisibleForTesting
+  public void printContainerLogsFromRunningApplication(Configuration conf,
       ContainerLogsRequest request, LogCLIHelpers logCliHelper)
       throws IOException {
     String containerIdStr = request.getContainerId().toString();
@@ -797,16 +801,29 @@ public class LogsCLI extends Configured implements Tool {
   }
 
   private int fetchApplicationLogs(ContainerLogsRequest options,
-      LogCLIHelpers logCliHelper) throws IOException {
-    // TODO: YARN-5141. To get container logs for the Running applications.
+      LogCLIHelpers logCliHelper) throws IOException, YarnException {
+    // If the application has finished, we would fetch the logs
+    // from HDFS.
+    // If the application is still running, we would get the full
+    // list of the containers first, then fetch the logs for each
+    // container from NM.
     int resultCode = 0;
-    ContainerLogsRequest newOptions = getMatchedLogOptions(
-        options, logCliHelper);
-    if (newOptions == null) {
-      resultCode = -1;
+    if (options.isAppFinished()) {
+      ContainerLogsRequest newOptions = getMatchedLogOptions(
+          options, logCliHelper);
+      if (newOptions == null) {
+        resultCode = -1;
+      } else {
+        resultCode =
+            logCliHelper.dumpAllContainersLogs(newOptions);
+      }
     } else {
-      resultCode =
-          logCliHelper.dumpAllContainersLogs(newOptions);
+      List<ContainerLogsRequest> containerLogRequests =
+          getContainersLogRequestForRunningApplication(options);
+      for (ContainerLogsRequest container : containerLogRequests) {
+        printContainerLogsFromRunningApplication(getConf(), container,
+            logCliHelper);
+      }
     }
     if (resultCode == -1) {
       System.err.println("Can not find the logs for the application: "
@@ -879,4 +896,38 @@ public class LogsCLI extends Configured implements Tool {
     }
     return false;
   }
+
+  private List<ContainerLogsRequest>
+      getContainersLogRequestForRunningApplication(
+          ContainerLogsRequest options) throws YarnException, IOException {
+    List<ContainerLogsRequest> newOptionsList =
+        new ArrayList<ContainerLogsRequest>();
+    YarnClient yarnClient = createYarnClient();
+    try {
+      List<ApplicationAttemptReport> attempts =
+          yarnClient.getApplicationAttempts(options.getAppId());
+      for (ApplicationAttemptReport attempt : attempts) {
+        List<ContainerReport> containers = yarnClient.getContainers(
+            attempt.getApplicationAttemptId());
+        for (ContainerReport container : containers) {
+          ContainerLogsRequest newOptions = new ContainerLogsRequest(options);
+          newOptions.setContainerId(container.getContainerId().toString());
+          newOptions.setNodeId(container.getAssignedNode().toString());
+          newOptions.setNodeHttpAddress(container.getNodeHttpAddress()
+              .replaceFirst(WebAppUtils.getHttpSchemePrefix(getConf()), ""));
+          // if we do not specify the value for CONTAINER_LOG_FILES option,
+          // we will only output syslog
+          List<String> logFiles = newOptions.getLogTypes();
+          if (logFiles == null || logFiles.isEmpty()) {
+            logFiles = Arrays.asList("syslog");
+            newOptions.setLogTypes(logFiles);
+          }
+          newOptionsList.add(newOptions);
+        }
+      }
+      return newOptionsList;
+    } finally {
+      yarnClient.close();
+    }
+  }
 }

+ 71 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java

@@ -22,9 +22,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
@@ -53,15 +57,18 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
 import org.junit.Assert;
@@ -396,6 +403,56 @@ public class TestLogsCLI {
     fs.delete(new Path(rootLogDir), true);
   }
 
+  @Test (timeout = 5000)
+  public void testFetchRunningApplicationLogs() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    NodeId nodeId = NodeId.newInstance("localhost", 1234);
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId
+        .newInstance(appId, 1);
+    // Create a mock ApplicationAttempt Report
+    ApplicationAttemptReport mockAttemptReport = mock(
+        ApplicationAttemptReport.class);
+    doReturn(appAttemptId).when(mockAttemptReport).getApplicationAttemptId();
+    List<ApplicationAttemptReport> attemptReports = Arrays.asList(
+        mockAttemptReport);
+
+    // Create two mock containerReports
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    ContainerReport mockContainerReport1 = mock(ContainerReport.class);
+    doReturn(containerId1).when(mockContainerReport1).getContainerId();
+    doReturn(nodeId).when(mockContainerReport1).getAssignedNode();
+    doReturn("http://localhost:2345").when(mockContainerReport1)
+        .getNodeHttpAddress();
+    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2);
+    ContainerReport mockContainerReport2 = mock(ContainerReport.class);
+    doReturn(containerId2).when(mockContainerReport2).getContainerId();
+    doReturn(nodeId).when(mockContainerReport2).getAssignedNode();
+    doReturn("http://localhost:2345").when(mockContainerReport2)
+        .getNodeHttpAddress();
+    List<ContainerReport> containerReports = Arrays.asList(
+        mockContainerReport1, mockContainerReport2);
+
+    // Mock the YarnClient, and it would report the previous created
+    // mockAttemptReport and previous two created mockContainerReports
+    YarnClient mockYarnClient = createMockYarnClient(
+        YarnApplicationState.RUNNING, ugi.getShortUserName(), true,
+        attemptReports, containerReports);
+    LogsCLI cli = spy(new LogsCLIForTest(mockYarnClient));
+    doNothing().when(cli).printContainerLogsFromRunningApplication(
+        any(Configuration.class), any(ContainerLogsRequest.class),
+        any(LogCLIHelpers.class));
+
+    cli.setConf(new YarnConfiguration());
+    int exitCode = cli.run(new String[] {"-applicationId", appId.toString()});
+    assertTrue(exitCode == 0);
+    // we have two container reports, so make sure we have called
+    // printContainerLogsFromRunningApplication twice
+    verify(cli, times(2)).printContainerLogsFromRunningApplication(
+        any(Configuration.class), any(ContainerLogsRequest.class),
+        any(LogCLIHelpers.class));
+  }
+
   @Test (timeout = 15000)
   public void testFetchApplictionLogsAsAnotherUser() throws Exception {
     String remoteLogRootDir = "target/logs/";
@@ -916,14 +973,26 @@ public class TestLogsCLI {
   }
 
   private YarnClient createMockYarnClient(YarnApplicationState appState,
-      String user)
-      throws YarnException, IOException {
+      String user) throws YarnException, IOException {
+    return createMockYarnClient(appState, user, false, null, null);
+  }
+
+  private YarnClient createMockYarnClient(YarnApplicationState appState,
+      String user, boolean mockContainerReport,
+      List<ApplicationAttemptReport> mockAttempts,
+      List<ContainerReport> mockContainers) throws YarnException, IOException {
     YarnClient mockClient = mock(YarnClient.class);
     ApplicationReport mockAppReport = mock(ApplicationReport.class);
     doReturn(user).when(mockAppReport).getUser();
     doReturn(appState).when(mockAppReport).getYarnApplicationState();
     doReturn(mockAppReport).when(mockClient).getApplicationReport(
         any(ApplicationId.class));
+    if (mockContainerReport) {
+      doReturn(mockAttempts).when(mockClient).getApplicationAttempts(
+          any(ApplicationId.class));
+      doReturn(mockContainers).when(mockClient).getContainers(any(
+          ApplicationAttemptId.class));
+    }
     return mockClient;
   }