Browse Source

HADOOP-15973. Configuration: Included properties are not cached if resource is a stream. Contributed by Eric Payne

Jason Lowe 6 years ago
parent
commit
3961690037

+ 49 - 29
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java

@@ -41,7 +41,6 @@ import java.io.Writer;
 import java.lang.ref.WeakReference;
 import java.lang.ref.WeakReference;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.JarURLConnection;
 import java.net.JarURLConnection;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URL;
 import java.net.URLConnection;
 import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -2972,36 +2971,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
     try {
     try {
       Object resource = wrapper.getResource();
       Object resource = wrapper.getResource();
       name = wrapper.getName();
       name = wrapper.getName();
-      XMLStreamReader2 reader = null;
       boolean returnCachedProperties = false;
       boolean returnCachedProperties = false;
-      boolean isRestricted = wrapper.isParserRestricted();
-
-      if (resource instanceof URL) {                  // an URL resource
-        reader = (XMLStreamReader2)parse((URL)resource, isRestricted);
-      } else if (resource instanceof String) {        // a CLASSPATH resource
-        URL url = getResource((String)resource);
-        reader = (XMLStreamReader2)parse(url, isRestricted);
-      } else if (resource instanceof Path) {          // a file resource
-        // Can't use FileSystem API or we get an infinite loop
-        // since FileSystem uses Configuration API.  Use java.io.File instead.
-        File file = new File(((Path)resource).toUri().getPath())
-          .getAbsoluteFile();
-        if (file.exists()) {
-          if (!quiet) {
-            LOG.debug("parsing File " + file);
-          }
-          reader = (XMLStreamReader2)parse(new BufferedInputStream(
-              new FileInputStream(file)), ((Path)resource).toString(),
-              isRestricted);
-        }
-      } else if (resource instanceof InputStream) {
-        reader = (XMLStreamReader2)parse((InputStream)resource, null,
-            isRestricted);
+
+      if (resource instanceof InputStream) {
         returnCachedProperties = true;
         returnCachedProperties = true;
       } else if (resource instanceof Properties) {
       } else if (resource instanceof Properties) {
         overlay(properties, (Properties)resource);
         overlay(properties, (Properties)resource);
       }
       }
 
 
+      XMLStreamReader2 reader = getStreamReader(wrapper, quiet);
       if (reader == null) {
       if (reader == null) {
         if (quiet) {
         if (quiet) {
           return null;
           return null;
@@ -3034,6 +3012,36 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
     }
     }
   }
   }
 
 
+  private XMLStreamReader2 getStreamReader(Resource wrapper, boolean quiet)
+      throws XMLStreamException, IOException {
+    Object resource = wrapper.getResource();
+    boolean isRestricted = wrapper.isParserRestricted();
+    XMLStreamReader2 reader = null;
+    if (resource instanceof URL) {                  // an URL resource
+      reader  = (XMLStreamReader2)parse((URL)resource, isRestricted);
+    } else if (resource instanceof String) {        // a CLASSPATH resource
+      URL url = getResource((String)resource);
+      reader = (XMLStreamReader2)parse(url, isRestricted);
+    } else if (resource instanceof Path) {          // a file resource
+      // Can't use FileSystem API or we get an infinite loop
+      // since FileSystem uses Configuration API.  Use java.io.File instead.
+      File file = new File(((Path)resource).toUri().getPath())
+        .getAbsoluteFile();
+      if (file.exists()) {
+        if (!quiet) {
+          LOG.debug("parsing File " + file);
+        }
+        reader = (XMLStreamReader2)parse(new BufferedInputStream(
+            new FileInputStream(file)), ((Path)resource).toString(),
+            isRestricted);
+      }
+    } else if (resource instanceof InputStream) {
+      reader = (XMLStreamReader2)parse((InputStream)resource, null,
+          isRestricted);
+    }
+    return reader;
+  }
+
   private static class ParsedItem {
   private static class ParsedItem {
     String name;
     String name;
     String key;
     String key;
@@ -3095,7 +3103,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       return results;
       return results;
     }
     }
 
 
-    private void handleStartElement() throws MalformedURLException {
+    private void handleStartElement() throws XMLStreamException, IOException {
       switch (reader.getLocalName()) {
       switch (reader.getLocalName()) {
       case "property":
       case "property":
         handleStartProperty();
         handleStartProperty();
@@ -3151,10 +3159,11 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       }
       }
     }
     }
 
 
-    private void handleInclude() throws MalformedURLException {
+    private void handleInclude() throws XMLStreamException, IOException {
       // Determine href for xi:include
       // Determine href for xi:include
       confInclude = null;
       confInclude = null;
       int attrCount = reader.getAttributeCount();
       int attrCount = reader.getAttributeCount();
+      List<ParsedItem> items;
       for (int i = 0; i < attrCount; i++) {
       for (int i = 0; i < attrCount; i++) {
         String attrName = reader.getAttributeLocalName(i);
         String attrName = reader.getAttributeLocalName(i);
         if ("href".equals(attrName)) {
         if ("href".equals(attrName)) {
@@ -3178,7 +3187,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
         // This is only called recursively while the lock is already held
         // This is only called recursively while the lock is already held
         // by this thread, but synchronizing avoids a findbugs warning.
         // by this thread, but synchronizing avoids a findbugs warning.
         synchronized (Configuration.this) {
         synchronized (Configuration.this) {
-          loadResource(properties, classpathResource, quiet);
+          XMLStreamReader2 includeReader =
+              getStreamReader(classpathResource, quiet);
+          if (includeReader == null) {
+            throw new RuntimeException(classpathResource + " not found");
+          }
+          items = new Parser(includeReader, classpathResource, quiet).parse();
         }
         }
       } else {
       } else {
         URL url;
         URL url;
@@ -3204,9 +3218,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
         // This is only called recursively while the lock is already held
         // This is only called recursively while the lock is already held
         // by this thread, but synchronizing avoids a findbugs warning.
         // by this thread, but synchronizing avoids a findbugs warning.
         synchronized (Configuration.this) {
         synchronized (Configuration.this) {
-          loadResource(properties, uriResource, quiet);
+          XMLStreamReader2 includeReader =
+              getStreamReader(uriResource, quiet);
+          if (includeReader == null) {
+            throw new RuntimeException(uriResource + " not found");
+          }
+          items = new Parser(includeReader, uriResource, quiet).parse();
         }
         }
       }
       }
+      results.addAll(items);
     }
     }
 
 
     void handleEndElement() throws IOException {
     void handleEndElement() throws IOException {

+ 100 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java

@@ -22,6 +22,7 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.IOException;
@@ -930,6 +931,105 @@ public class TestConfiguration {
     tearDown();
     tearDown();
   }
   }
 
 
+  // When a resource is parsed as an input stream the first time, included
+  // properties are saved within the config. However, the included properties
+  // are not cached in the resource object. So, if an additional resource is
+  // added after the config is parsed the first time, the config loses the
+  // prperties that were included from the first resource.
+  @Test
+  public void testIncludesFromInputStreamWhenResourceAdded() throws Exception {
+    tearDown();
+
+    // CONFIG includes CONFIG2. CONFIG2 includes CONFIG_FOR_ENUM
+    out=new BufferedWriter(new FileWriter(CONFIG_FOR_ENUM));
+    startConfig();
+    appendProperty("e", "SecondLevelInclude");
+    appendProperty("f", "SecondLevelInclude");
+    endConfig();
+
+    out=new BufferedWriter(new FileWriter(CONFIG2));
+    startConfig();
+    startInclude(CONFIG_FOR_ENUM);
+    endInclude();
+    appendProperty("c","FirstLevelInclude");
+    appendProperty("d","FirstLevelInclude");
+    endConfig();
+
+    out=new BufferedWriter(new FileWriter(CONFIG));
+    startConfig();
+    startInclude(CONFIG2);
+    endInclude();
+    appendProperty("a", "1");
+    appendProperty("b", "2");
+    endConfig();
+
+    // Add CONFIG as an InputStream resource.
+    File file = new File(CONFIG);
+    BufferedInputStream bis =
+        new BufferedInputStream(new FileInputStream(file));
+    conf.addResource(bis);
+
+    // The first time the conf is parsed, verify that all properties were read
+    // from all levels of includes.
+    assertEquals("1", conf.get("a"));
+    assertEquals("2", conf.get("b"));
+    assertEquals("FirstLevelInclude", conf.get("c"));
+    assertEquals("FirstLevelInclude", conf.get("d"));
+    assertEquals("SecondLevelInclude", conf.get("e"));
+    assertEquals("SecondLevelInclude", conf.get("f"));
+
+    // Add another resource to the conf.
+    out=new BufferedWriter(new FileWriter(CONFIG_MULTI_BYTE));
+    startConfig();
+    appendProperty("g", "3");
+    appendProperty("h", "4");
+    endConfig();
+
+    Path fileResource = new Path(CONFIG_MULTI_BYTE);
+    conf.addResource(fileResource);
+
+    // Verify that all properties were read from all levels of includes the
+    // second time the conf is parsed.
+    assertEquals("1", conf.get("a"));
+    assertEquals("2", conf.get("b"));
+    assertEquals("FirstLevelInclude", conf.get("c"));
+    assertEquals("FirstLevelInclude", conf.get("d"));
+    assertEquals("SecondLevelInclude", conf.get("e"));
+    assertEquals("SecondLevelInclude", conf.get("f"));
+    assertEquals("3", conf.get("g"));
+    assertEquals("4", conf.get("h"));
+
+    tearDown();
+  }
+
+  @Test
+  public void testOrderOfDuplicatePropertiesWithInclude() throws Exception {
+    tearDown();
+
+    // Property "a" is set to different values inside and outside of includes.
+    out=new BufferedWriter(new FileWriter(CONFIG2));
+    startConfig();
+    appendProperty("a", "a-InsideInclude");
+    appendProperty("b", "b-InsideInclude");
+    endConfig();
+
+    out=new BufferedWriter(new FileWriter(CONFIG));
+    startConfig();
+    appendProperty("a","a-OutsideInclude");
+    startInclude(CONFIG2);
+    endInclude();
+    appendProperty("b","b-OutsideInclude");
+    endConfig();
+
+    Path fileResource = new Path(CONFIG);
+    conf.addResource(fileResource);
+
+    assertEquals("a-InsideInclude", conf.get("a"));
+    assertEquals("b-OutsideInclude", conf.get("b"));
+
+    tearDown();
+  }
+
   @Test
   @Test
   public void testRelativeIncludes() throws Exception {
   public void testRelativeIncludes() throws Exception {
     tearDown();
     tearDown();