Pārlūkot izejas kodu

YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
after create them. Contributed by zhihai xu

(cherry picked from commit 688617d6d7e6377a37682b5676b805cc6e8cf3f0)

Xuan 10 gadi atpakaļ
vecāks
revīzija
09639ac6b4

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -541,6 +541,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3875. FSSchedulerNode#reserveResource() doesn't print Application Id
     properly in log. (Bibin A Chundatt via devaraj)
 
+    YARN-3882. AggregatedLogFormat should close aclScanner and ownerScanner
+    after create them. (zhihai xu via xgong)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 46 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -489,18 +489,23 @@ public class AggregatedLogFormat {
      * @throws IOException
      */
     public String getApplicationOwner() throws IOException {
-      TFile.Reader.Scanner ownerScanner = reader.createScanner();
-      LogKey key = new LogKey();
-      while (!ownerScanner.atEnd()) {
-        TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
-        key.readFields(entry.getKeyStream());
-        if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
-          DataInputStream valueStream = entry.getValueStream();
-          return valueStream.readUTF();
+      TFile.Reader.Scanner ownerScanner = null;
+      try {
+        ownerScanner = reader.createScanner();
+        LogKey key = new LogKey();
+        while (!ownerScanner.atEnd()) {
+          TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
+          key.readFields(entry.getKeyStream());
+          if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
+            DataInputStream valueStream = entry.getValueStream();
+            return valueStream.readUTF();
+          }
+          ownerScanner.advance();
         }
-        ownerScanner.advance();
+        return null;
+      } finally {
+        IOUtils.cleanup(LOG, ownerScanner);
       }
-      return null;
     }
 
     /**
@@ -513,38 +518,42 @@ public class AggregatedLogFormat {
     public Map<ApplicationAccessType, String> getApplicationAcls()
         throws IOException {
       // TODO Seek directly to the key once a comparator is specified.
-      TFile.Reader.Scanner aclScanner = reader.createScanner();
-      LogKey key = new LogKey();
-      Map<ApplicationAccessType, String> acls =
-          new HashMap<ApplicationAccessType, String>();
-      while (!aclScanner.atEnd()) {
-        TFile.Reader.Scanner.Entry entry = aclScanner.entry();
-        key.readFields(entry.getKeyStream());
-        if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
-          DataInputStream valueStream = entry.getValueStream();
-          while (true) {
-            String appAccessOp = null;
-            String aclString = null;
-            try {
-              appAccessOp = valueStream.readUTF();
-            } catch (EOFException e) {
-              // Valid end of stream.
-              break;
-            }
-            try {
-              aclString = valueStream.readUTF();
-            } catch (EOFException e) {
-              throw new YarnRuntimeException("Error reading ACLs", e);
+      TFile.Reader.Scanner aclScanner = null;
+      try {
+        aclScanner = reader.createScanner();
+        LogKey key = new LogKey();
+        Map<ApplicationAccessType, String> acls =
+            new HashMap<ApplicationAccessType, String>();
+        while (!aclScanner.atEnd()) {
+          TFile.Reader.Scanner.Entry entry = aclScanner.entry();
+          key.readFields(entry.getKeyStream());
+          if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
+            DataInputStream valueStream = entry.getValueStream();
+            while (true) {
+              String appAccessOp = null;
+              String aclString = null;
+              try {
+                appAccessOp = valueStream.readUTF();
+              } catch (EOFException e) {
+                // Valid end of stream.
+                break;
+              }
+              try {
+                aclString = valueStream.readUTF();
+              } catch (EOFException e) {
+                throw new YarnRuntimeException("Error reading ACLs", e);
+              }
+              acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
             }
-            acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
           }
-
+          aclScanner.advance();
         }
-        aclScanner.advance();
+        return acls;
+      } finally {
+        IOUtils.cleanup(LOG, aclScanner);
       }
-      return acls;
     }
-    
+
     /**
      * Read the next key and return the value-stream.
      *