Browse Source

HADOOP-3299. CompositeInputFormat should configure the sub-input
formats. Contributed by Chris Douglas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@654308 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 years ago
parent
commit
bdf84a607d

+ 3 - 0
CHANGES.txt

@@ -194,6 +194,9 @@ Trunk (unreleased changes)
     HADOOP-3085. Catch Exception in metrics util classes to ensure that
     misconfigured metrics don't prevent others from updating. (cdouglas)
 
+    HADOOP-3299. CompositeInputFormat should configure the sub-input
+    formats. (cdouglas via omalley)
+
 Release 0.17.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 1 - 3
src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java

@@ -72,9 +72,7 @@ public class CompositeInputFormat<K extends WritableComparable>
   public void setFormat(JobConf job) throws IOException {
     addDefaults();
     addUserIdentifiers(job);
-    Class<? extends WritableComparator> cmpcl =
-      job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
-    root = Parser.parse(job.get("mapred.join.expr", null), cmpcl);
+    root = Parser.parse(job.get("mapred.join.expr", null), job);
   }
 
   /**

+ 10 - 9
src/java/org/apache/hadoop/mapred/join/Parser.java

@@ -233,7 +233,7 @@ public class Parser {
     protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
       this.cmpcl = cmpcl;
     }
-    abstract void parse(List<Token> args) throws IOException;
+    abstract void parse(List<Token> args, JobConf job) throws IOException;
   }
 
   /**
@@ -260,7 +260,7 @@ public class Parser {
      * Let the first actual define the InputFormat and the second define
      * the <tt>mapred.input.dir</tt> property.
      */
-    public void parse(List<Token> ll) throws IOException {
+    public void parse(List<Token> ll, JobConf job) throws IOException {
       StringBuilder sb = new StringBuilder();
       Iterator<Token> i = ll.iterator();
       while (i.hasNext()) {
@@ -269,7 +269,7 @@ public class Parser {
           try {
             inf = (InputFormat)ReflectionUtils.newInstance(
                 Class.forName(sb.toString()).asSubclass(InputFormat.class),
-                null);
+                job);
           } catch (ClassNotFoundException e) {
             throw (IOException)new IOException().initCause(e);
           } catch (IllegalArgumentException e) {
@@ -424,7 +424,7 @@ public class Parser {
     /**
      * Parse a list of comma-separated nodes.
      */
-    public void parse(List<Token> args) throws IOException {
+    public void parse(List<Token> args, JobConf job) throws IOException {
       ListIterator<Token> i = args.listIterator();
       while (i.hasNext()) {
         Token t = i.next();
@@ -447,7 +447,7 @@ public class Parser {
     }
   }
 
-  private static Token reduce(Stack<Token> st) throws IOException {
+  private static Token reduce(Stack<Token> st, JobConf job) throws IOException {
     LinkedList<Token> args = new LinkedList<Token>();
     while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
       args.addFirst(st.pop());
@@ -460,7 +460,7 @@ public class Parser {
       throw new IOException("Identifier expected");
     }
     Node n = Node.forIdent(st.pop().getStr());
-    n.parse(args);
+    n.parse(args, job);
     return new NodeToken(n);
   }
 
@@ -468,17 +468,18 @@ public class Parser {
    * Given an expression and an optional comparator, build a tree of
    * InputFormats using the comparator to sort keys.
    */
-  static Node parse(String expr, Class<? extends WritableComparator> cmpcl)
-      throws IOException {
+  static Node parse(String expr, JobConf job) throws IOException {
     if (null == expr) {
       throw new IOException("Expression is null");
     }
+    Class<? extends WritableComparator> cmpcl =
+      job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
     Lexer lex = new Lexer(expr);
     Stack<Token> st = new Stack<Token>();
     Token tok;
     while ((tok = lex.next()) != null) {
       if (TType.RPAREN.equals(tok.getType())) {
-        st.push(reduce(st));
+        st.push(reduce(st, job));
       } else {
         st.push(tok);
       }

+ 51 - 0
src/test/org/apache/hadoop/mapred/join/ConfigurableInputFormat.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ConfigurableInputFormat implements InputFormat, JobConfigurable {
+  boolean configured = false;
+
+  public ConfigurableInputFormat() { }
+
+  public void configure(JobConf job) {
+    configured = true;
+  }
+  public void validateInput(JobConf job) throws IOException {
+    if (!configured)
+      throw new IOException("Failed to configure child InputFormat");
+  }
+
+  public InputSplit[] getSplits(JobConf job, int numSplits)
+      throws IOException {
+    return null;
+  }
+
+  public RecordReader getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    return null;
+  }
+}

+ 8 - 0
src/test/org/apache/hadoop/mapred/join/TestDatamerge.java

@@ -243,4 +243,12 @@ public class TestDatamerge extends TestCase {
   public void testSimpleOverride() throws Exception {
     joinAs("override", OverrideChecker.class);
   }
+
+  public void testConfiguredInputFormat() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set("mapred.join.expr", CompositeInputFormat.compose(
+          ConfigurableInputFormat.class, "/dingos"));
+    CompositeInputFormat cif = new CompositeInputFormat();
+    cif.validateInput(conf);
+  }
 }