瀏覽代碼

YARN-10855. yarn logs cli fails to retrieve logs if any TFile is corrupt or empty. Contributed by Jim Brennan.

zhuqi-lucas 3 年之前
父節點
當前提交
0ac443b1f8

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java

@@ -396,7 +396,9 @@ public class TestLogsCLI {
     ContainerId containerId1 = ContainerId.newContainerId(appAttemptId1, 1);
     ContainerId containerId1 = ContainerId.newContainerId(appAttemptId1, 1);
     ContainerId containerId2 = ContainerId.newContainerId(appAttemptId1, 2);
     ContainerId containerId2 = ContainerId.newContainerId(appAttemptId1, 2);
     ContainerId containerId3 = ContainerId.newContainerId(appAttemptId2, 3);
     ContainerId containerId3 = ContainerId.newContainerId(appAttemptId2, 3);
+    ContainerId containerId4 = ContainerId.newContainerId(appAttemptId2, 4);
     final NodeId nodeId = NodeId.newInstance("localhost", 1234);
     final NodeId nodeId = NodeId.newInstance("localhost", 1234);
+    final NodeId badNodeId = NodeId.newInstance("badhost", 5678);
 
 
     // create local logs
     // create local logs
     String rootLogDir = "target/LocalLogs";
     String rootLogDir = "target/LocalLogs";
@@ -449,6 +451,8 @@ public class TestLogsCLI {
       containerId2, path, fs);
       containerId2, path, fs);
     uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirs, nodeId,
     uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirs, nodeId,
       containerId3, path, fs);
       containerId3, path, fs);
+    uploadTruncatedTFileIntoRemoteDir(ugi, conf, badNodeId,
+        containerId4, fs);
 
 
     YarnClient mockYarnClient =
     YarnClient mockYarnClient =
         createMockYarnClient(
         createMockYarnClient(
@@ -801,6 +805,17 @@ public class TestLogsCLI {
         "Invalid ContainerId specified"));
         "Invalid ContainerId specified"));
     sysErrStream.reset();
     sysErrStream.reset();
 
 
+    // Uploaded the empty log for container4. We should see a message
+    // showing the log for container4 is not present.
+    exitCode =
+        cli.run(new String[] {"-applicationId", appId.toString(),
+            "-nodeAddress", badNodeId.toString(), "-containerId",
+            containerId4.toString()});
+    assertTrue(exitCode == -1);
+    assertTrue(sysErrStream.toString().contains(
+        "Can not find any log file matching the pattern"));
+    sysErrStream.reset();
+
     fs.delete(new Path(remoteLogRootDir), true);
     fs.delete(new Path(remoteLogRootDir), true);
     fs.delete(new Path(rootLogDir), true);
     fs.delete(new Path(rootLogDir), true);
   }
   }
@@ -1820,6 +1835,21 @@ public class TestLogsCLI {
     }
     }
   }
   }
 
 
+  private static void uploadTruncatedTFileIntoRemoteDir(
+      UserGroupInformation ugi, Configuration configuration,
+      NodeId nodeId, ContainerId containerId,
+      FileSystem fs) throws Exception {
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(configuration);
+    LogAggregationFileController fileFormat = factory
+        .getFileControllerForWrite();
+    ApplicationId appId = containerId.getApplicationAttemptId()
+        .getApplicationId();
+    Path path = fileFormat.getRemoteNodeLogFileForApp(
+        appId, ugi.getCurrentUser().getShortUserName(), nodeId);
+    fs.create(path, true).close();
+  }
+
   private LogsCLI createCli() throws IOException, YarnException {
   private LogsCLI createCli() throws IOException, YarnException {
     YarnClient mockYarnClient =
     YarnClient mockYarnClient =
             createMockYarnClient(YarnApplicationState.FINISHED,
             createMockYarnClient(YarnApplicationState.FINISHED,

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -565,7 +565,7 @@ public class AggregatedLogFormat {
 
 
   @Public
   @Public
   @Evolving
   @Evolving
-  public static class LogReader {
+  public static class LogReader implements AutoCloseable {
 
 
     private final FSDataInputStream fsDataIStream;
     private final FSDataInputStream fsDataIStream;
     private final TFile.Reader.Scanner scanner;
     private final TFile.Reader.Scanner scanner;

+ 12 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java

@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
@@ -193,10 +192,7 @@ public class LogAggregationTFileController
       if ((nodeId == null || nodeName.contains(LogAggregationUtils
       if ((nodeId == null || nodeName.contains(LogAggregationUtils
           .getNodeString(nodeId))) && !nodeName.endsWith(
           .getNodeString(nodeId))) && !nodeName.endsWith(
               LogAggregationUtils.TMP_FILE_SUFFIX)) {
               LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader = null;
-        try {
-          reader = new AggregatedLogFormat.LogReader(conf,
-              thisNodeFile.getPath());
+        try (LogReader reader = new LogReader(conf, thisNodeFile.getPath())) {
           DataInputStream valueStream;
           DataInputStream valueStream;
           LogKey key = new LogKey();
           LogKey key = new LogKey();
           valueStream = reader.next(key);
           valueStream = reader.next(key);
@@ -251,10 +247,10 @@ public class LogAggregationTFileController
             key = new LogKey();
             key = new LogKey();
             valueStream = reader.next(key);
             valueStream = reader.next(key);
           }
           }
-        } finally {
-          if (reader != null) {
-            reader.close();
-          }
+        } catch (IOException ex) {
+          LOG.error("Skipping empty or corrupt file " +
+              thisNodeFile.getPath(), ex);
+          continue; // skip empty or corrupt files
         }
         }
       }
       }
     }
     }
@@ -268,10 +264,7 @@ public class LogAggregationTFileController
     Map<String, List<ContainerLogFileInfo>> logMetaFiles = new HashMap<>();
     Map<String, List<ContainerLogFileInfo>> logMetaFiles = new HashMap<>();
     Path nodePath = currentNodeFile.getPath();
     Path nodePath = currentNodeFile.getPath();
 
 
-    LogReader reader =
-        new LogReader(conf,
-            nodePath);
-    try {
+    try (LogReader reader = new LogReader(conf, nodePath)) {
       DataInputStream valueStream;
       DataInputStream valueStream;
       LogKey key = new LogKey();
       LogKey key = new LogKey();
       valueStream = reader.next(key);
       valueStream = reader.next(key);
@@ -286,8 +279,6 @@ public class LogAggregationTFileController
         key = new LogKey();
         key = new LogKey();
         valueStream = reader.next(key);
         valueStream = reader.next(key);
       }
       }
-    } finally {
-      reader.close();
     }
     }
     return logMetaFiles;
     return logMetaFiles;
   }
   }
@@ -349,10 +340,8 @@ public class LogAggregationTFileController
       }
       }
       if (!thisNodeFile.getPath().getName()
       if (!thisNodeFile.getPath().getName()
           .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
           .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader =
-            new AggregatedLogFormat.LogReader(conf,
-            thisNodeFile.getPath());
-        try {
+        try (LogReader reader = new LogReader(conf,
+            thisNodeFile.getPath())) {
           DataInputStream valueStream;
           DataInputStream valueStream;
           LogKey key = new LogKey();
           LogKey key = new LogKey();
           valueStream = reader.next(key);
           valueStream = reader.next(key);
@@ -383,8 +372,10 @@ public class LogAggregationTFileController
             key = new LogKey();
             key = new LogKey();
             valueStream = reader.next(key);
             valueStream = reader.next(key);
           }
           }
-        } finally {
-          reader.close();
+        } catch (IOException ex) {
+          LOG.error("Skipping empty or corrupt file " +
+              thisNodeFile.getPath(), ex);
+          continue; // skip empty or corrupt files
         }
         }
       }
       }
     }
     }