Browse Source

YARN-2724. Skipped uploading a local log file to HDFS if exception is raised when opening it. Contributed by Xuan Gong.

(cherry picked from commit e31f0a6558b106662c83e1f797216e412b6689a9)
Zhijie Shen 10 năm trước cách đây
mục cha
commit
38fa39aa17

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -673,6 +673,9 @@ Release 2.6.0 - UNRELEASED
 
     YARN-2732. Fixed syntax error in SecureContainer.apt.vm. (Jian He via zjshen)
 
+    YARN-2724. Skipped uploading a local log file to HDFS if exception is raised
+    when opening it. (Xuan Gong via zjshen)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 25 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -211,6 +212,16 @@ public class AggregatedLogFormat {
       Collections.sort(fileList);
 
       for (File logFile : fileList) {
+
+        FileInputStream in = null;
+        try {
+          in = secureOpenFile(logFile);
+        } catch (IOException e) {
+          logErrorMessage(logFile, e);
+          IOUtils.cleanup(LOG, in);
+          continue;
+        }
+
         final long fileLength = logFile.length();
         // Write the logFile Type
         out.writeUTF(logFile.getName());
@@ -219,9 +230,7 @@ public class AggregatedLogFormat {
         out.writeUTF(String.valueOf(fileLength));
 
         // Write the log itself
-        FileInputStream in = null;
         try {
-          in = SecureIOUtils.openForRead(logFile, getUser(), null);
           byte[] buf = new byte[65535];
           int len = 0;
           long bytesLeft = fileLength;
@@ -244,18 +253,26 @@ public class AggregatedLogFormat {
           }
           this.uploadedFiles.add(logFile);
         } catch (IOException e) {
-          String message = "Error aggregating log file. Log file : "
-              + logFile.getAbsolutePath() + e.getMessage();
-          LOG.error(message, e);
+          String message = logErrorMessage(logFile, e);
           out.write(message.getBytes());
         } finally {
-          if (in != null) {
-            in.close();
-          }
+          IOUtils.cleanup(LOG, in);
         }
       }
     }
 
+    @VisibleForTesting
+    public FileInputStream secureOpenFile(File logFile) throws IOException {
+      return SecureIOUtils.openForRead(logFile, getUser(), null);
+    }
+
+    private static String logErrorMessage(File logFile, Exception e) {
+      String message = "Error aggregating log file. Log file : "
+          + logFile.getAbsolutePath() + ". " + e.getMessage();
+      LOG.error(message, e);
+      return message;
+    }
+
     // Added for testing purpose.
     public String getUser() {
       return user;

+ 15 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.logaggregation;
 
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doThrow;
 
 import java.io.BufferedReader;
 import java.io.DataInputStream;
@@ -37,7 +38,6 @@ import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -194,6 +194,8 @@ public class TestAggregatedLogFormat {
 
     int numChars = 80000;
 
+    // create file stderr and stdout in containerLogDir
+    writeSrcFile(srcFilePath, "stderr", numChars);
     writeSrcFile(srcFilePath, "stdout", numChars);
 
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@@ -204,7 +206,14 @@ public class TestAggregatedLogFormat {
         new LogValue(Collections.singletonList(srcFileRoot.toString()),
             testContainerId, ugi.getShortUserName());
 
-    logWriter.append(logKey, logValue);
+    // When we try to open FileInputStream for stderr, it will throw out an IOException.
+    // Skip the log aggregation for stderr.
+    LogValue spyLogValue = spy(logValue);
+    File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
+    doThrow(new IOException("Mock can not open FileInputStream")).when(
+      spyLogValue).secureOpenFile(errorFile);
+
+    logWriter.append(logKey, spyLogValue);
     logWriter.close();
 
     // make sure permission are correct on the file
@@ -218,11 +227,15 @@ public class TestAggregatedLogFormat {
     Writer writer = new StringWriter();
     LogReader.readAcontainerLogs(dis, writer);
     
+    // We should only do the log aggregation for stdout.
+    // Since we could not open the fileInputStream for stderr, this file is not
+    // aggregated.
     String s = writer.toString();
     int expectedLength =
         "\n\nLogType:stdout".length() + ("\nLogLength:" + numChars).length()
             + "\nLog Contents:\n".length() + numChars;
     Assert.assertTrue("LogType not matched", s.contains("LogType:stdout"));
+    Assert.assertTrue("log file:stderr should not be aggregated.", !s.contains("LogType:stderr"));
     Assert.assertTrue("LogLength not matched", s.contains("LogLength:" + numChars));
     Assert.assertTrue("Log Contents not matched", s.contains("Log Contents"));