瀏覽代碼

YARN-10207. CLOSE_WAIT socket connection leaks during rendering of (corrupted) aggregated logs on the JobHistoryServer Web UI. Contributed by Siddharth Ahuja

Szilard Nemeth 5 年之前
父節點
當前提交
bffb43b00e

+ 11 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -579,13 +579,17 @@ public class AggregatedLogFormat {
 
     public LogReader(Configuration conf, Path remoteAppLogFile)
         throws IOException {
-      FileContext fileContext =
-          FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
-      this.fsDataIStream = fileContext.open(remoteAppLogFile);
-      reader =
-          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
-              remoteAppLogFile).getLen(), conf);
-      this.scanner = reader.createScanner();
+      try {
+        FileContext fileContext =
+            FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
+        this.fsDataIStream = fileContext.open(remoteAppLogFile);
+        reader = new TFile.Reader(this.fsDataIStream,
+            fileContext.getFileStatus(remoteAppLogFile).getLen(), conf);
+        this.scanner = reader.createScanner();
+      } catch (IOException ioe) {
+        close();
+        throw new IOException("Error in creating LogReader", ioe);
+      }
     }
 
     private boolean atBeginning = true;

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java

@@ -33,6 +33,10 @@ import java.io.OutputStreamWriter;
 import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
 import java.io.Writer;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
@@ -121,6 +125,20 @@ public class TestAggregatedLogFormat {
         Assert.fail("Aggregated logs are corrupted.");
       }
     }
+
+    //Append some corrupted text to the end of the aggregated file.
+    URI logUri = URI.create("file:///" + remoteAppLogFile.toUri().toString());
+    Files.write(Paths.get(logUri),
+        "corrupt_text".getBytes(), StandardOpenOption.APPEND);
+    try {
+      // Trying to read a corrupted log file created above should cause
+      // log reading to fail below with an IOException.
+      logReader = new LogReader(conf, remoteAppLogFile);
+      Assert.fail("Expect IOException from reading corrupt aggregated logs.");
+    } catch (IOException ioe) {
+      DataInputStream dIS = logReader.next(rLogKey);
+      Assert.assertNull("Input stream not available for reading", dIS);
+    }
   }
 
   private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,