Browse Source

YARN-9336. JobHistoryServer leaks CLOSE_WAIT tcp connections when using LogAggregationIndexedFileController. Contributed by Tarun Parimi.

Rohith Sharma K S 6 năm trước cách đây
mục cha
commit
c24af4b0d6

+ 32 - 32
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java

@@ -202,38 +202,38 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
     Decompressor decompressor = compressName.getDecompressor();
     FileContext fileContext = FileContext.getFileContext(
         thisNodeFile.getPath().toUri(), conf);
-    FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath());
-    int bufferSize = 65536;
-    for (IndexedFileLogMeta candidate : candidates) {
-      if (candidate.getLastModifiedTime() < startTime
-          || candidate.getLastModifiedTime() > endTime) {
-        continue;
-      }
-      byte[] cbuf = new byte[bufferSize];
-      InputStream in = null;
-      try {
-        in = compressName.createDecompressionStream(
-            new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(),
-                candidate.getFileCompressedSize()), decompressor,
-            LogAggregationIndexedFileController.getFSInputBufferSize(conf));
-        long logLength = candidate.getFileSize();
-        html.pre().__("\n\n").__();
-        html.p().__("Log Type: " + candidate.getFileName()).__();
-        html.p().__(
-            "Log Upload Time: " + Times.format(candidate.getLastModifiedTime()))
-            .__();
-        html.p().__("Log Length: " + Long.toString(logLength)).__();
-
-        long[] range = checkParseRange(html, start, end, startTime, endTime,
-            logLength, candidate.getFileName());
-        processContainerLog(html, range, in, bufferSize, cbuf);
-
-        foundLog = true;
-      } catch (Exception ex) {
-        LOG.error("Error getting logs for " + logEntity, ex);
-        continue;
-      } finally {
-        IOUtils.closeQuietly(in);
+    try (FSDataInputStream fsin = fileContext.open(thisNodeFile.getPath())) {
+      int bufferSize = 65536;
+      for (IndexedFileLogMeta candidate : candidates) {
+        if (candidate.getLastModifiedTime() < startTime
+            || candidate.getLastModifiedTime() > endTime) {
+          continue;
+        }
+        byte[] cbuf = new byte[bufferSize];
+        InputStream in = null;
+        try {
+          in = compressName.createDecompressionStream(
+              new BoundedRangeFileInputStream(fsin, candidate.getStartIndex(),
+                  candidate.getFileCompressedSize()), decompressor,
+              LogAggregationIndexedFileController.getFSInputBufferSize(conf));
+          long logLength = candidate.getFileSize();
+          html.pre().__("\n\n").__();
+          html.p().__("Log Type: " + candidate.getFileName()).__();
+          html.p().__("Log Upload Time: " +
+              Times.format(candidate.getLastModifiedTime())).__();
+          html.p().__("Log Length: " + Long.toString(logLength)).__();
+
+          long[] range = checkParseRange(html, start, end, startTime, endTime,
+              logLength, candidate.getFileName());
+          processContainerLog(html, range, in, bufferSize, cbuf);
+
+          foundLog = true;
+        } catch (Exception ex) {
+          LOG.error("Error getting logs for " + logEntity, ex);
+          continue;
+        } finally {
+          IOUtils.closeQuietly(in);
+        }
       }
     }
     return foundLog;