Browse Source

YARN-1131. logs command should return an appropriate error message if YARN application is still running. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1529068 13f79535-47bb-0310-9956-ffa450edef68
Hitesh Shah 11 years ago
parent
commit
6ff600d9e3

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java

@@ -54,7 +54,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.logaggregation.LogDumper;
+import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
 
 import com.google.common.base.Charsets;
 
@@ -359,7 +359,7 @@ public class CLI extends Configured implements Tool {
         JobID jobID = JobID.forName(jobid);
         TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
         LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
-        LogDumper logDumper = new LogDumper();
+        LogCLIHelpers logDumper = new LogCLIHelpers();
         logDumper.setConf(getConf());
         exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
             logParams.getContainerId(), logParams.getNodeId(),

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -153,6 +153,9 @@ Release 2.1.2 - UNRELEASED
     YARN-1271. "Text file busy" errors launching containers again
     (Sandy Ryza)
 
+    YARN-1131. $yarn logs command should return an appropriate error message if
+    YARN application is still running. (Siddharth Seth via hitesh)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/bin/yarn

@@ -210,7 +210,7 @@ elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hadoop.util.RunJar
   YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
 elif [ "$COMMAND" = "logs" ] ; then
-  CLASS=org.apache.hadoop.yarn.logaggregation.LogDumper
+  CLASS=org.apache.hadoop.yarn.client.cli.LogsCLI
   YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
 elif [ "$COMMAND" = "daemonlog" ] ; then
   CLASS=org.apache.hadoop.log.LogLevel

+ 65 - 125
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogDumper.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java

@@ -16,45 +16,39 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.logaggregation;
+package org.apache.hadoop.yarn.client.cli;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.PrintStream;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
 @Public
 @Evolving
-public class LogDumper extends Configured implements Tool {
+public class LogsCLI extends Configured implements Tool {
 
   private static final String CONTAINER_ID_OPTION = "containerId";
   private static final String APPLICATION_ID_OPTION = "applicationId";
@@ -65,7 +59,9 @@ public class LogDumper extends Configured implements Tool {
   public int run(String[] args) throws Exception {
 
     Options opts = new Options();
-    opts.addOption(APPLICATION_ID_OPTION, true, "ApplicationId (required)");
+    Option appIdOpt = new Option(APPLICATION_ID_OPTION, true, "ApplicationId (required)");
+    appIdOpt.setRequired(true);
+    opts.addOption(appIdOpt);
     opts.addOption(CONTAINER_ID_OPTION, true,
       "ContainerId (must be specified if node address is specified)");
     opts.addOption(NODE_ADDRESS_OPTION, true, "NodeAddress in the format "
@@ -99,28 +95,46 @@ public class LogDumper extends Configured implements Tool {
       nodeAddress = commandLine.getOptionValue(NODE_ADDRESS_OPTION);
       appOwner = commandLine.getOptionValue(APP_OWNER_OPTION);
     } catch (ParseException e) {
-      System.out.println("options parsing failed: " + e.getMessage());
+      System.err.println("options parsing failed: " + e.getMessage());
       printHelpMessage(printOpts);
       return -1;
     }
 
     if (appIdStr == null) {
-      System.out.println("ApplicationId cannot be null!");
+      System.err.println("ApplicationId cannot be null!");
       printHelpMessage(printOpts);
       return -1;
     }
 
-    RecordFactory recordFactory =
-        RecordFactoryProvider.getRecordFactory(getConf());
-    ApplicationId appId =
-        ConverterUtils.toApplicationId(recordFactory, appIdStr);
+    ApplicationId appId = null;
+    try {
+      appId = ConverterUtils.toApplicationId(appIdStr);
+    } catch (Exception e) {
+      System.err.println("Invalid ApplicationId specified");
+      return -1;
+    }
+    
+    try {
+      int resultCode = verifyApplicationState(appId);
+      if (resultCode != 0) {
+        System.out.println("Application has not completed." +
+        		" Logs are only available after an application completes");
+        return resultCode;
+      }
+    } catch (Exception e) {
+      System.err.println("Unable to get ApplicationState." +
+      		" Attempting to fetch logs directly from the filesystem.");
+    }
 
+    LogCLIHelpers logCliHelper = new LogCLIHelpers();
+    logCliHelper.setConf(getConf());
+    
     if (appOwner == null || appOwner.isEmpty()) {
       appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
     }
     int resultCode = 0;
     if (containerIdStr == null && nodeAddress == null) {
-      resultCode = dumpAllContainersLogs(appId, appOwner, System.out);
+      resultCode = logCliHelper.dumpAllContainersLogs(appId, appOwner, System.out);
     } else if ((containerIdStr == null && nodeAddress != null)
         || (containerIdStr != null && nodeAddress == null)) {
       System.out.println("ContainerId or NodeAddress cannot be null!");
@@ -138,123 +152,49 @@ public class LogDumper extends Configured implements Tool {
                   appOwner,
                   ConverterUtils.toNodeId(nodeAddress),
                   LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
-      resultCode = dumpAContainerLogs(containerIdStr, reader, System.out);
+      resultCode = logCliHelper.dumpAContainerLogs(containerIdStr, reader, System.out);
     }
 
     return resultCode;
   }
 
-  @Private
-  @VisibleForTesting
-  public int dumpAContainersLogs(String appId, String containerId,
-      String nodeId, String jobOwner) throws IOException {
-    Path remoteRootLogDir =
-        new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
-    Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp(
-        remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner,
-        ConverterUtils.toNodeId(nodeId), suffix);
-    AggregatedLogFormat.LogReader reader;
-    try {
-      reader = new AggregatedLogFormat.LogReader(getConf(), logPath);
-    } catch (FileNotFoundException fnfe) {
-      System.out.println("Logs not available at " + logPath.toString());
-      System.out.println(
-          "Log aggregation has not completed or is not enabled.");
-      return -1;
-    }
-    return dumpAContainerLogs(containerId, reader, System.out);
-  }
-
-  private int dumpAContainerLogs(String containerIdStr,
-      AggregatedLogFormat.LogReader reader, PrintStream out)
-      throws IOException {
-    DataInputStream valueStream;
-    LogKey key = new LogKey();
-    valueStream = reader.next(key);
-
-    while (valueStream != null && !key.toString().equals(containerIdStr)) {
-      // Next container
-      key = new LogKey();
-      valueStream = reader.next(key);
-    }
-
-    if (valueStream == null) {
-      System.out.println("Logs for container " + containerIdStr
-          + " are not present in this log-file.");
-      return -1;
-    }
-
-    while (true) {
-      try {
-        LogReader.readAContainerLogsForALogType(valueStream, out);
-      } catch (EOFException eof) {
-        break;
-      }
-    }
-    return 0;
-  }
+  private int verifyApplicationState(ApplicationId appId) throws IOException,
+      YarnException {
+    YarnClient yarnClient = createYarnClient();
 
-  private int dumpAllContainersLogs(ApplicationId appId, String appOwner,
-      PrintStream out) throws IOException {
-    Path remoteRootLogDir =
-        new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    String user = appOwner;
-    String logDirSuffix =
-        LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
-    //TODO Change this to get a list of files from the LAS.
-    Path remoteAppLogDir =
-        LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, appId, user,
-            logDirSuffix);
-    RemoteIterator<FileStatus> nodeFiles;
     try {
-      nodeFiles = FileContext.getFileContext().listStatus(remoteAppLogDir);
-    } catch (FileNotFoundException fnf) {
-      System.out.println("Logs not available at "
-          + remoteAppLogDir.toString());
-      System.out.println(
-          "Log aggregation has not completed or is not enabled.");
-      return -1;
-    }
-    while (nodeFiles.hasNext()) {
-      FileStatus thisNodeFile = nodeFiles.next();
-      AggregatedLogFormat.LogReader reader =
-          new AggregatedLogFormat.LogReader(getConf(),
-              new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
-      try {
-
-        DataInputStream valueStream;
-        LogKey key = new LogKey();
-        valueStream = reader.next(key);
-
-        while (valueStream != null) {
-          String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
-          out.println(containerString);
-          out.println(StringUtils.repeat("=", containerString.length()));
-          while (true) {
-            try {
-              LogReader.readAContainerLogsForALogType(valueStream, out);
-            } catch (EOFException eof) {
-              break;
-            }
-          }
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      switch (appReport.getYarnApplicationState()) {
+      case NEW:
+      case NEW_SAVING:
+      case ACCEPTED:
+      case SUBMITTED:
+      case RUNNING:
+        return -1;
+      case FAILED:
+      case FINISHED:
+      case KILLED:
+      default:
+        break;
 
-          // Next container
-          key = new LogKey();
-          valueStream = reader.next(key);
-        }
-      } finally {
-        reader.close();
       }
+    } finally {
+      yarnClient.close();
     }
     return 0;
   }
+  
+  @VisibleForTesting
+  protected YarnClient createYarnClient() {
+    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(getConf());
+    yarnClient.start();
+    return yarnClient;
+  }
 
   public static void main(String[] args) throws Exception {
     Configuration conf = new YarnConfiguration();
-    LogDumper logDumper = new LogDumper();
+    LogsCLI logDumper = new LogsCLI();
     logDumper.setConf(conf);
     int exitCode = logDumper.run(args);
     System.exit(exitCode);

+ 172 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java

@@ -0,0 +1,172 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.cli;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLogsCLI {
+  ByteArrayOutputStream sysOutStream;
+  private PrintStream sysOut;
+  
+  ByteArrayOutputStream sysErrStream;
+  private PrintStream sysErr;
+
+  @Before
+  public void setUp() {
+    sysOutStream = new ByteArrayOutputStream();
+    sysOut =  new PrintStream(sysOutStream);
+    System.setOut(sysOut);
+    
+    sysErrStream = new ByteArrayOutputStream();
+    sysErr = new PrintStream(sysErrStream);
+    System.setErr(sysErr);
+  }
+
+  @Test(timeout = 5000l)
+  public void testFailResultCodes() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
+    LogCLIHelpers cliHelper = new LogCLIHelpers();
+    cliHelper.setConf(conf);
+    YarnClient mockYarnClient = createMockYarnClient(YarnApplicationState.FINISHED);
+    LogsCLI dumper = new LogsCLIForTest(mockYarnClient);
+    dumper.setConf(conf);
+    
+    // verify dumping a non-existent application's logs returns a failure code
+    int exitCode = dumper.run( new String[] {
+        "-applicationId", "application_0_0" } );
+    assertTrue("Should return an error code", exitCode != 0);
+    
+    // verify dumping a non-existent container log is a failure code 
+    exitCode = cliHelper.dumpAContainersLogs("application_0_0", "container_0_0",
+        "nonexistentnode:1234", "nobody");
+    assertTrue("Should return an error code", exitCode != 0);
+  }
+
+  @Test(timeout = 5000l)
+  public void testInvalidApplicationId() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    YarnClient mockYarnClient = createMockYarnClient(YarnApplicationState.FINISHED);
+    LogsCLI cli = new LogsCLIForTest(mockYarnClient);
+    cli.setConf(conf);
+    
+    int exitCode = cli.run( new String[] { "-applicationId", "not_an_app_id"});
+    assertTrue(exitCode == -1);
+    assertTrue(sysErrStream.toString().startsWith("Invalid ApplicationId specified"));
+  }
+
+  @Test(timeout = 5000l)
+  public void testUnknownApplicationId() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    YarnClient mockYarnClient = createMockYarnClientUnknownApp();
+    LogsCLI cli = new LogsCLIForTest(mockYarnClient);
+    cli.setConf(conf);
+
+    int exitCode = cli.run(new String[] { "-applicationId",
+        ApplicationId.newInstance(1, 1).toString() });
+
+    // Error since no logs present for the app.
+    assertTrue(exitCode != 0);
+    assertTrue(sysErrStream.toString().startsWith(
+        "Unable to get ApplicationState"));
+  }
+
+  @Test(timeout = 5000l)
+  public void testHelpMessage() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    YarnClient mockYarnClient = createMockYarnClient(YarnApplicationState.FINISHED);
+    LogsCLI dumper = new LogsCLIForTest(mockYarnClient);
+    dumper.setConf(conf);
+
+    int exitCode = dumper.run(new String[]{});
+    assertTrue(exitCode == -1);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    pw.println("Retrieve logs for completed YARN applications.");
+    pw.println("usage: yarn logs -applicationId <application ID> [OPTIONS]");
+    pw.println();
+    pw.println("general options are:");
+    pw.println(" -appOwner <Application Owner>   AppOwner (assumed to be current user if");
+    pw.println("                                 not specified)");
+    pw.println(" -containerId <Container ID>     ContainerId (must be specified if node");
+    pw.println("                                 address is specified)");
+    pw.println(" -nodeAddress <Node Address>     NodeAddress in the format nodename:port");
+    pw.println("                                 (must be specified if container id is");
+    pw.println("                                 specified)");
+    pw.close();
+    String appReportStr = baos.toString("UTF-8");
+    Assert.assertEquals(appReportStr, sysOutStream.toString());
+  }
+  
+  private YarnClient createMockYarnClient(YarnApplicationState appState)
+      throws YarnException, IOException {
+    YarnClient mockClient = mock(YarnClient.class);
+    ApplicationReport mockAppReport = mock(ApplicationReport.class);
+    doReturn(appState).when(mockAppReport).getYarnApplicationState();
+    doReturn(mockAppReport).when(mockClient).getApplicationReport(
+        any(ApplicationId.class));
+    return mockClient;
+  }
+
+  private YarnClient createMockYarnClientUnknownApp() throws YarnException,
+      IOException {
+    YarnClient mockClient = mock(YarnClient.class);
+    doThrow(new YarnException("Unknown AppId")).when(mockClient)
+        .getApplicationReport(any(ApplicationId.class));
+    return mockClient;
+  }
+
+  private static class LogsCLIForTest extends LogsCLI {
+    
+    private YarnClient yarnClient;
+    
+    public LogsCLIForTest(YarnClient yarnClient) {
+      super();
+      this.yarnClient = yarnClient;
+    }
+
+    protected YarnClient createYarnClient() {
+      return yarnClient;
+    }
+  }
+}

+ 162 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class LogCLIHelpers implements Configurable {
+
+  private Configuration conf;
+
+  @Private
+  @VisibleForTesting
+  public int dumpAContainersLogs(String appId, String containerId,
+      String nodeId, String jobOwner) throws IOException {
+    Path remoteRootLogDir = new Path(getConf().get(
+        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
+    Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp(
+        remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner,
+        ConverterUtils.toNodeId(nodeId), suffix);
+    AggregatedLogFormat.LogReader reader;
+    try {
+      reader = new AggregatedLogFormat.LogReader(getConf(), logPath);
+    } catch (FileNotFoundException fnfe) {
+      System.out.println("Logs not available at " + logPath.toString());
+      System.out
+          .println("Log aggregation has not completed or is not enabled.");
+      return -1;
+    }
+    return dumpAContainerLogs(containerId, reader, System.out);
+  }
+
+  @Private
+  public int dumpAContainerLogs(String containerIdStr,
+      AggregatedLogFormat.LogReader reader, PrintStream out) throws IOException {
+    DataInputStream valueStream;
+    LogKey key = new LogKey();
+    valueStream = reader.next(key);
+
+    while (valueStream != null && !key.toString().equals(containerIdStr)) {
+      // Next container
+      key = new LogKey();
+      valueStream = reader.next(key);
+    }
+
+    if (valueStream == null) {
+      System.out.println("Logs for container " + containerIdStr
+          + " are not present in this log-file.");
+      return -1;
+    }
+
+    while (true) {
+      try {
+        LogReader.readAContainerLogsForALogType(valueStream, out);
+      } catch (EOFException eof) {
+        break;
+      }
+    }
+    return 0;
+  }
+
+  @Private
+  public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
+      PrintStream out) throws IOException {
+    Path remoteRootLogDir = new Path(getConf().get(
+        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    String user = appOwner;
+    String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
+    // TODO Change this to get a list of files from the LAS.
+    Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
+        remoteRootLogDir, appId, user, logDirSuffix);
+    RemoteIterator<FileStatus> nodeFiles;
+    try {
+      nodeFiles = FileContext.getFileContext().listStatus(remoteAppLogDir);
+    } catch (FileNotFoundException fnf) {
+      System.out.println("Logs not available at " + remoteAppLogDir.toString());
+      System.out
+          .println("Log aggregation has not completed or is not enabled.");
+      return -1;
+    }
+    while (nodeFiles.hasNext()) {
+      FileStatus thisNodeFile = nodeFiles.next();
+      AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(
+          getConf(), new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
+      try {
+
+        DataInputStream valueStream;
+        LogKey key = new LogKey();
+        valueStream = reader.next(key);
+
+        while (valueStream != null) {
+          String containerString = "\n\nContainer: " + key + " on "
+              + thisNodeFile.getPath().getName();
+          out.println(containerString);
+          out.println(StringUtils.repeat("=", containerString.length()));
+          while (true) {
+            try {
+              LogReader.readAContainerLogsForALogType(valueStream, out);
+            } catch (EOFException eof) {
+              break;
+            }
+          }
+
+          // Next container
+          key = new LogKey();
+          valueStream = reader.next(key);
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+}

+ 0 - 89
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogDumper.java

@@ -1,89 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.logaggregation;
-
-import static org.junit.Assert.assertTrue;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestLogDumper {
-  ByteArrayOutputStream sysOutStream;
-  private PrintStream sysOut;
-
-  @Before
-  public void setUp() {
-    sysOutStream = new ByteArrayOutputStream();
-    sysOut =  new PrintStream(sysOutStream);
-    System.setOut(sysOut);
-  }
-
-  @Test
-  public void testFailResultCodes() throws Exception {
-    Configuration conf = new YarnConfiguration();
-    conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class);
-    LogDumper dumper = new LogDumper();
-    dumper.setConf(conf);
-    
-    // verify dumping a non-existent application's logs returns a failure code
-    int exitCode = dumper.run( new String[] {
-        "-applicationId", "application_0_0" } );
-    assertTrue("Should return an error code", exitCode != 0);
-    
-    // verify dumping a non-existent container log is a failure code 
-    exitCode = dumper.dumpAContainersLogs("application_0_0", "container_0_0",
-        "nonexistentnode:1234", "nobody");
-    assertTrue("Should return an error code", exitCode != 0);
-  }
-
-  @Test
-  public void testHelpMessage() throws Exception {
-    Configuration conf = new YarnConfiguration();
-    LogDumper dumper = new LogDumper();
-    dumper.setConf(conf);
-
-    int exitCode = dumper.run(new String[]{});
-    assertTrue(exitCode == -1);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    PrintWriter pw = new PrintWriter(baos);
-    pw.println("Retrieve logs for completed YARN applications.");
-    pw.println("usage: yarn logs -applicationId <application ID> [OPTIONS]");
-    pw.println();
-    pw.println("general options are:");
-    pw.println(" -appOwner <Application Owner>   AppOwner (assumed to be current user if");
-    pw.println("                                 not specified)");
-    pw.println(" -containerId <Container ID>     ContainerId (must be specified if node");
-    pw.println("                                 address is specified)");
-    pw.println(" -nodeAddress <Node Address>     NodeAddress in the format nodename:port");
-    pw.println("                                 (must be specified if container id is");
-    pw.println("                                 specified)");
-    pw.close();
-    String appReportStr = baos.toString("UTF-8");
-    Assert.assertEquals(appReportStr, sysOutStream.toString());
-  }
-}