Browse Source

YARN-6288. Exceptions during aggregated log writes are mishandled. Contributed by Akira Ajisaka

Jason Lowe 8 years ago
parent
commit
6e41aec109

+ 29 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java

@@ -390,18 +390,18 @@ public class TestLogsCLI {
     Path path =
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
             + System.currentTimeMillis());
             + System.currentTimeMillis());
-    AggregatedLogFormat.LogWriter writer =
-        new AggregatedLogFormat.LogWriter(configuration, path, ugi);
-    writer.writeApplicationOwner(ugi.getUserName());
-
-    Map<ApplicationAccessType, String> appAcls =
-        new HashMap<ApplicationAccessType, String>();
-    appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-    writer.writeApplicationACLs(appAcls);
-    writer.append(new AggregatedLogFormat.LogKey(containerId),
-      new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
-        UserGroupInformation.getCurrentUser().getShortUserName()));
-    writer.close();
+    try (AggregatedLogFormat.LogWriter writer =
+             new AggregatedLogFormat.LogWriter()) {
+      writer.initialize(configuration, path, ugi);
+      writer.writeApplicationOwner(ugi.getUserName());
+
+      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      writer.writeApplicationACLs(appAcls);
+      writer.append(new AggregatedLogFormat.LogKey(containerId),
+          new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+              UserGroupInformation.getCurrentUser().getShortUserName()));
+    }
   }
   }
 
 
   private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
   private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
@@ -410,23 +410,23 @@ public class TestLogsCLI {
     Path path =
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
             + System.currentTimeMillis());
             + System.currentTimeMillis());
-    AggregatedLogFormat.LogWriter writer =
-        new AggregatedLogFormat.LogWriter(configuration, path, ugi);
-    writer.writeApplicationOwner(ugi.getUserName());
-
-    Map<ApplicationAccessType, String> appAcls =
-        new HashMap<ApplicationAccessType, String>();
-    appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-    writer.writeApplicationACLs(appAcls);
-    DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
-    new AggregatedLogFormat.LogKey(containerId).write(out);
-    out.close();
-    out = writer.getWriter().prepareAppendValue(-1);
-    new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
-      UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
-      new HashSet<File>());
-    out.close();
-    writer.close();
+    try (AggregatedLogFormat.LogWriter writer =
+             new AggregatedLogFormat.LogWriter()) {
+      writer.initialize(configuration, path, ugi);
+      writer.writeApplicationOwner(ugi.getUserName());
+
+      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      writer.writeApplicationACLs(appAcls);
+      DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
+      new AggregatedLogFormat.LogKey(containerId).write(out);
+      out.close();
+      out = writer.getWriter().prepareAppendValue(-1);
+      new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+          UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
+          new HashSet<File>());
+      out.close();
+    }
   }
   }
 
 
   private YarnClient createMockYarnClient(YarnApplicationState appState)
   private YarnClient createMockYarnClient(YarnApplicationState appState)

+ 21 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.Times;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
@@ -378,14 +377,23 @@ public class AggregatedLogFormat {
    * The writer that writes out the aggregated logs.
    * The writer that writes out the aggregated logs.
    */
    */
   @Private
   @Private
-  public static class LogWriter {
+  public static class LogWriter implements AutoCloseable {
 
 
-    private final FSDataOutputStream fsDataOStream;
-    private final TFile.Writer writer;
+    private FSDataOutputStream fsDataOStream;
+    private TFile.Writer writer;
     private FileContext fc;
     private FileContext fc;
 
 
-    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
-        UserGroupInformation userUgi) throws IOException {
+    /**
+     * Initialize the LogWriter.
+     * Must be called just after the instance is created.
+     * @param conf Configuration
+     * @param remoteAppLogFile remote log file path
+     * @param userUgi Ugi of the user
+     * @throws IOException Failed to initialize
+     */
+    public void initialize(final Configuration conf,
+                           final Path remoteAppLogFile,
+                           UserGroupInformation userUgi) throws IOException {
       try {
       try {
         this.fsDataOStream =
         this.fsDataOStream =
             userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
             userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@@ -463,11 +471,14 @@ public class AggregatedLogFormat {
       }
       }
     }
     }
 
 
+    @Override
     public void close() {
     public void close() {
-      try {
-        this.writer.close();
-      } catch (IOException e) {
-        LOG.warn("Exception closing writer", e);
+      if (writer != null) {
+        try {
+          this.writer.close();
+        } catch (IOException e) {
+          LOG.warn("Exception closing writer", e);
+        }
       }
       }
       IOUtils.closeStream(fsDataOStream);
       IOUtils.closeStream(fsDataOStream);
     }
     }

+ 60 - 60
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java

@@ -140,43 +140,44 @@ public class TestAggregatedLogFormat {
     final int ch = filler;
     final int ch = filler;
 
 
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
+    try (LogWriter logWriter = new LogWriter()) {
+      logWriter.initialize(conf, remoteAppLogFile, ugi);
 
 
-    LogKey logKey = new LogKey(testContainerId);
-    LogValue logValue =
-        spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
-            testContainerId, ugi.getShortUserName()));
+      LogKey logKey = new LogKey(testContainerId);
+      LogValue logValue =
+          spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
+              testContainerId, ugi.getShortUserName()));
 
 
-    final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch latch = new CountDownLatch(1);
 
 
-    Thread t = new Thread() {
-      public void run() {
-        try {
-          for(int i=0; i < length/3; i++) {
+      Thread t = new Thread() {
+        public void run() {
+          try {
+            for (int i = 0; i < length / 3; i++) {
               osw.write(ch);
               osw.write(ch);
-          }
+            }
 
 
-          latch.countDown();
+            latch.countDown();
 
 
-          for(int i=0; i < (2*length)/3; i++) {
-            osw.write(ch);
+            for (int i = 0; i < (2 * length) / 3; i++) {
+              osw.write(ch);
+            }
+            osw.close();
+          } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
           }
           }
-          osw.close();
-        } catch (IOException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
         }
         }
-      }
-    };
-    t.start();
+      };
+      t.start();
 
 
-    //Wait till the osw is partially written
-    //aggregation starts once the ows has completed 1/3rd of its work
-    latch.await();
+      //Wait till the osw is partially written
+      //aggregation starts once the ows has completed 1/3rd of its work
+      latch.await();
 
 
-    //Aggregate The Logs
-    logWriter.append(logKey, logValue);
-    logWriter.close();
+      //Aggregate The Logs
+      logWriter.append(logKey, logValue);
+    }
   }
   }
 
 
   @Test
   @Test
@@ -215,22 +216,22 @@ public class TestAggregatedLogFormat {
     writeSrcFile(srcFilePath, "stdout", numChars);
     writeSrcFile(srcFilePath, "stdout", numChars);
 
 
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
-
-    LogKey logKey = new LogKey(testContainerId);
-    LogValue logValue =
-        new LogValue(Collections.singletonList(srcFileRoot.toString()),
-            testContainerId, ugi.getShortUserName());
-
-    // When we try to open FileInputStream for stderr, it will throw out an IOException.
-    // Skip the log aggregation for stderr.
-    LogValue spyLogValue = spy(logValue);
-    File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
-    doThrow(new IOException("Mock can not open FileInputStream")).when(
-      spyLogValue).secureOpenFile(errorFile);
-
-    logWriter.append(logKey, spyLogValue);
-    logWriter.close();
+    try (LogWriter logWriter = new LogWriter()) {
+      logWriter.initialize(conf, remoteAppLogFile, ugi);
+      LogKey logKey = new LogKey(testContainerId);
+      LogValue logValue =
+          new LogValue(Collections.singletonList(srcFileRoot.toString()),
+              testContainerId, ugi.getShortUserName());
+
+      // When we try to open FileInputStream for stderr, it will throw out an
+      // IOException. Skip the log aggregation for stderr.
+      LogValue spyLogValue = spy(logValue);
+      File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
+      doThrow(new IOException("Mock can not open FileInputStream")).when(
+          spyLogValue).secureOpenFile(errorFile);
+
+      logWriter.append(logKey, spyLogValue);
+    }
 
 
     // make sure permission are correct on the file
     // make sure permission are correct on the file
     FileStatus fsStatus =  fs.getFileStatus(remoteAppLogFile);
     FileStatus fsStatus =  fs.getFileStatus(remoteAppLogFile);
@@ -310,24 +311,23 @@ public class TestAggregatedLogFormat {
 
 
     UserGroupInformation ugi =
     UserGroupInformation ugi =
         UserGroupInformation.getCurrentUser();
         UserGroupInformation.getCurrentUser();
-    LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
+    try (LogWriter logWriter = new LogWriter()) {
+      logWriter.initialize(conf, remoteAppLogFile, ugi);
+      LogKey logKey = new LogKey(testContainerId1);
+      String randomUser = "randomUser";
+      LogValue logValue =
+          spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
+              testContainerId1, randomUser));
+
+      // It is trying simulate a situation where first log file is owned by
+      // different user (probably symlink) and second one by the user itself.
+      // The first file should not be aggregated. Because this log file has the
+      // invalid user name.
+      when(logValue.getUser()).thenReturn(randomUser).thenReturn(
+          ugi.getShortUserName());
+      logWriter.append(logKey, logValue);
+    }
 
 
-    LogKey logKey = new LogKey(testContainerId1);
-    String randomUser = "randomUser";
-    LogValue logValue =
-        spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
-            testContainerId1, randomUser));
-    
-    // It is trying simulate a situation where first log file is owned by
-    // different user (probably symlink) and second one by the user itself.
-    // The first file should not be aggregated. Because this log file has the invalid
-    // user name.
-    when(logValue.getUser()).thenReturn(randomUser).thenReturn(
-        ugi.getShortUserName());
-    logWriter.append(logKey, logValue);
-
-    logWriter.close();
-    
     BufferedReader in =
     BufferedReader in =
         new BufferedReader(new FileReader(new File(remoteAppLogFile
         new BufferedReader(new FileReader(new File(remoteAppLogFile
             .toUri().getRawPath())));
             .toUri().getRawPath())));

+ 13 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java

@@ -275,17 +275,19 @@ public class TestAggregatedLogsBlock {
     List<String> rootLogDirs = Arrays.asList("target/logs/logs");
     List<String> rootLogDirs = Arrays.asList("target/logs/logs");
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
 
-    AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter(
-        configuration, new Path(path), ugi);
-    writer.writeApplicationOwner(ugi.getUserName());
-
-    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
-    appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-    writer.writeApplicationACLs(appAcls);
-
-    writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
-        new AggregatedLogFormat.LogValue(rootLogDirs, containerId,UserGroupInformation.getCurrentUser().getShortUserName()));
-    writer.close();
+    try (AggregatedLogFormat.LogWriter writer =
+             new AggregatedLogFormat.LogWriter()) {
+      writer.initialize(configuration, new Path(path), ugi);
+      writer.writeApplicationOwner(ugi.getUserName());
+
+      Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      writer.writeApplicationACLs(appAcls);
+
+      writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
+          new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+              UserGroupInformation.getCurrentUser().getShortUserName()));
+    }
   }
   }
 
 
   private void writeLogs(String dirName) throws Exception {
   private void writeLogs(String dirName) throws Exception {

+ 31 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.Times;
 
 
@@ -313,24 +312,21 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       }
       }
     }
     }
 
 
-    LogWriter writer = null;
+    if (pendingContainerInThisCycle.isEmpty()) {
+      sendLogAggregationReport(true, "", appFinished);
+      return;
+    }
+
+    logAggregationTimes++;
     String diagnosticMessage = "";
     String diagnosticMessage = "";
     boolean logAggregationSucceedInThisCycle = true;
     boolean logAggregationSucceedInThisCycle = true;
-    try {
-      if (pendingContainerInThisCycle.isEmpty()) {
-        return;
-      }
-
-      logAggregationTimes++;
-
+    try (LogWriter writer = new LogWriter()){
       try {
       try {
-        writer =
-            new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
-              this.userUgi);
+        writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
+            this.userUgi);
         // Write ACLs once when the writer is created.
         // Write ACLs once when the writer is created.
         writer.writeApplicationACLs(appAcls);
         writer.writeApplicationACLs(appAcls);
         writer.writeApplicationOwner(this.userUgi.getShortUserName());
         writer.writeApplicationOwner(this.userUgi.getShortUserName());
-
       } catch (IOException e1) {
       } catch (IOException e1) {
         logAggregationSucceedInThisCycle = false;
         logAggregationSucceedInThisCycle = false;
         LOG.error("Cannot create writer for app " + this.applicationId
         LOG.error("Cannot create writer for app " + this.applicationId
@@ -371,11 +367,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         cleanupOldLogTimes++;
         cleanupOldLogTimes++;
       }
       }
 
 
-      if (writer != null) {
-        writer.close();
-        writer = null;
-      }
-
       long currentTime = System.currentTimeMillis();
       long currentTime = System.currentTimeMillis();
       final Path renamedPath = this.rollingMonitorInterval <= 0
       final Path renamedPath = this.rollingMonitorInterval <= 0
               ? remoteNodeLogFileForApp : new Path(
               ? remoteNodeLogFileForApp : new Path(
@@ -416,29 +407,32 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         logAggregationSucceedInThisCycle = false;
         logAggregationSucceedInThisCycle = false;
       }
       }
     } finally {
     } finally {
-      LogAggregationStatus logAggregationStatus =
-          logAggregationSucceedInThisCycle
-              ? LogAggregationStatus.RUNNING
-              : LogAggregationStatus.RUNNING_WITH_FAILURE;
-      sendLogAggregationReport(logAggregationStatus, diagnosticMessage);
-      if (appFinished) {
-        // If the app is finished, one extra final report with log aggregation
-        // status SUCCEEDED/FAILED will be sent to RM to inform the RM
-        // that the log aggregation in this NM is completed.
-        LogAggregationStatus finalLogAggregationStatus =
-            renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
-                ? LogAggregationStatus.FAILED
-                : LogAggregationStatus.SUCCEEDED;
-        sendLogAggregationReport(finalLogAggregationStatus, "");
-      }
-
-      if (writer != null) {
-        writer.close();
-      }
+      sendLogAggregationReport(logAggregationSucceedInThisCycle,
+          diagnosticMessage, appFinished);
     }
     }
   }
   }
 
 
   private void sendLogAggregationReport(
   private void sendLogAggregationReport(
+      boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
+      boolean appFinished) {
+    LogAggregationStatus logAggregationStatus =
+        logAggregationSucceedInThisCycle
+            ? LogAggregationStatus.RUNNING
+            : LogAggregationStatus.RUNNING_WITH_FAILURE;
+    sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
+    if (appFinished) {
+      // If the app is finished, one extra final report with log aggregation
+      // status SUCCEEDED/FAILED will be sent to RM to inform the RM
+      // that the log aggregation in this NM is completed.
+      LogAggregationStatus finalLogAggregationStatus =
+          renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
+              ? LogAggregationStatus.FAILED
+              : LogAggregationStatus.SUCCEEDED;
+      sendLogAggregationReportInternal(finalLogAggregationStatus, "");
+    }
+  }
+
+  private void sendLogAggregationReportInternal(
       LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
       LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
     LogAggregationReport report =
     LogAggregationReport report =
         Records.newRecord(LogAggregationReport.class);
         Records.newRecord(LogAggregationReport.class);