Bladeren bron

HADOOP-4293. Make Configuration Writable and remove unreleased
WritableJobConf. Configuration.write is renamed to writeXml. (omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@700291 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 16 jaren geleden
bovenliggende
commit
d8b3f38ae5

+ 3 - 0
CHANGES.txt

@@ -108,6 +108,9 @@ Release 0.19.0 - Unreleased
     HADOOP-3938. Disk space quotas for HDFS. This is similar to namespace
     quotas in 0.18. (rangadi)
 
+    HADOOP-4293. Make Configuration Writable and remove unreleased 
+    WritableJobConf. Configuration.write is renamed to writeXml. (omalley)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

+ 28 - 3
src/core/org/apache/hadoop/conf/Configuration.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.conf;
 
 import java.io.BufferedInputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -54,6 +56,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
@@ -128,7 +132,8 @@ import org.xml.sax.SAXException;
  * <tt>${<i>user.name</i>}</tt> would then ordinarily be resolved to the value
  * of the System property with that name.
  */
-public class Configuration implements Iterable<Map.Entry<String,String>> {
+public class Configuration implements Iterable<Map.Entry<String,String>>,
+                                      Writable {
   private static final Log LOG =
     LogFactory.getLog(Configuration.class);
 
@@ -1063,7 +1068,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
    * 
    * @param out the output stream to write to.
    */
-  public void write(OutputStream out) throws IOException {
+  public void writeXml(OutputStream out) throws IOException {
     Properties properties = getProps();
     try {
       Document doc =
@@ -1154,7 +1159,27 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
 
   /** For debugging.  List non-default properties to the terminal and exit. */
   public static void main(String[] args) throws Exception {
-    new Configuration().write(System.out);
+    new Configuration().writeXml(System.out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    clear();
+    int size = WritableUtils.readVInt(in);
+    for(int i=0; i < size; ++i) {
+      set(org.apache.hadoop.io.Text.readString(in), 
+          org.apache.hadoop.io.Text.readString(in));
+    }
+  }
+
+  //@Override
+  public void write(DataOutput out) throws IOException {
+    Properties props = getProps();
+    WritableUtils.writeVInt(out, props.size());
+    for(Map.Entry<Object, Object> item: props.entrySet()) {
+      org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
+      org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
+    }
   }
 
 }

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -808,7 +808,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
         new FsPermission(JOB_FILE_PERMISSION));
 
     try {
-      job.write(out);
+      job.writeXml(out);
     } finally {
       out.close();
     }

+ 3 - 3
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -860,7 +860,7 @@ public class JobHistory {
       FileOutputStream jobOut = null;
       try {
         jobOut = new FileOutputStream(localJobFile);
-        jobConf.write(jobOut);
+        jobConf.writeXml(jobOut);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Job conf for " + jobId + " stored at " 
                     + localJobFile.getAbsolutePath());
@@ -895,14 +895,14 @@ public class JobHistory {
           fs = new Path(LOG_DIR).getFileSystem(jobConf);
           if (!fs.exists(jobFilePath)) {
             jobFileOut = fs.create(jobFilePath);
-            jobConf.write(jobFileOut);
+            jobConf.writeXml(jobFileOut);
             jobFileOut.close();
           }
         } 
         if (userLogDir != null) {
           fs = new Path(userLogDir).getFileSystem(jobConf);
           jobFileOut = fs.create(userJobFilePath);
-          jobConf.write(jobFileOut);
+          jobConf.writeXml(jobFileOut);
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Job conf for " + jobId + " stored at " 

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -187,7 +187,7 @@ abstract class TaskRunner extends Thread {
         localFs.delete(localTaskFile, true);
         OutputStream out = localFs.create(localTaskFile);
         try {
-          conf.write(out);
+          conf.writeXml(out);
         } finally {
           out.close();
         }

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -765,7 +765,7 @@ public class TaskTracker
           localJobConf.setJar(localJarFile.toString());
           OutputStream out = localFs.create(localJobFile);
           try {
-            localJobConf.write(out);
+            localJobConf.writeXml(out);
           } finally {
             out.close();
           }
@@ -1842,7 +1842,7 @@ public class TaskTracker
       }
       OutputStream out = localFs.create(localTaskFile);
       try {
-        localJobConf.write(out);
+        localJobConf.writeXml(out);
       } finally {
         out.close();
       }

+ 0 - 99
src/mapred/org/apache/hadoop/mapred/WritableJobConf.java

@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Writable JobConf is the Writable version of the JobConf. 
- */
-public class WritableJobConf extends JobConf implements Writable {
-
-  /**
-   * This C.tor does not load default configuration files, 
-   * hadoop-(default|site).xml
-   */
-  public WritableJobConf() {
-    super(false); //do not load defaults
-  }
-
-  public WritableJobConf(boolean loadDefaults) {
-    super(loadDefaults);    
-  }
-
-  public WritableJobConf(Class<?> exampleClass) {
-    super(exampleClass);
-  }
-
-  public WritableJobConf(Configuration conf, Class<?> exampleClass) {
-    super(conf, exampleClass);
-  }
-
-  public WritableJobConf(Configuration conf) {
-    super(conf);
-  }
-
-  public WritableJobConf(Path config) {
-    super(config);
-  }
-
-  public WritableJobConf(String config) {
-    super(config);
-  }
-
-  @Override
-  public void readFields(final DataInput in) throws IOException {
-    if(in instanceof InputStream) {
-      this.addResource((InputStream)in);
-    } else {
-      this.addResource(new  InputStream() {
-        @Override
-        public int read() throws IOException {
-          return in.readByte();
-        }
-      });
-    }
-    
-    this.get("foo"); //so that getProps() is called, before returining back.
-  }
-
-  @Override
-  public void write(final DataOutput out) throws IOException {
-    if(out instanceof OutputStream) {
-      write((OutputStream)out);
-    }
-    else {
-      write(new OutputStream() {
-        @Override
-        public void write(int b) throws IOException {
-          out.writeByte(b);
-        }
-      });
-    }
-  }
-
-}

+ 8 - 8
src/mapred/org/apache/hadoop/mapred/lib/Chain.java

@@ -122,8 +122,8 @@ class Chain {
   private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
     JobConf conf;
     try {
-      Stringifier<WritableJobConf> stringifier =
-        new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+      Stringifier<JobConf> stringifier =
+        new DefaultStringifier<JobConf>(jobConf, JobConf.class);
       conf = stringifier.fromString(jobConf.get(confKey, null));
     } catch (IOException ioex) {
       throw new RuntimeException(ioex);
@@ -233,11 +233,11 @@ class Chain {
                         Object.class);
 
     // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<WritableJobConf> stringifier =
-      new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+    Stringifier<JobConf> stringifier =
+      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
     try {
       jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
-                  stringifier.toString(new WritableJobConf(mapperConf)));
+                  stringifier.toString(new JobConf(mapperConf)));
     }
     catch (IOException ioEx) {
       throw new RuntimeException(ioEx);
@@ -300,11 +300,11 @@ class Chain {
                          Object.class);
 
     // serialize the private mapper jobconf in the chain jobconf.
-    Stringifier<WritableJobConf> stringifier =
-      new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+    Stringifier<JobConf> stringifier =
+      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
     try {
       jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
-                  stringifier.toString(new WritableJobConf(reducerConf)));
+                  stringifier.toString(new JobConf(reducerConf)));
     }
     catch (IOException ioEx) {
       throw new RuntimeException(ioEx);

+ 3 - 3
src/test/org/apache/hadoop/mapred/TestWritableJobConf.java

@@ -77,13 +77,13 @@ public class TestWritableJobConf extends TestCase {
   }
 
   public void testEmptyConfiguration() throws Exception {
-    WritableJobConf conf = new WritableJobConf();
+    JobConf conf = new JobConf();
     Configuration deser = serDeser(conf);
     assertEquals(conf, deser);
   }
 
   public void testNonEmptyConfiguration() throws Exception {
-    WritableJobConf conf = new WritableJobConf();
+    JobConf conf = new JobConf();
     conf.set("a", "A");
     conf.set("b", "B");
     Configuration deser = serDeser(conf);
@@ -91,7 +91,7 @@ public class TestWritableJobConf extends TestCase {
   }
 
   public void testConfigurationWithDefaults() throws Exception {
-    WritableJobConf conf = new WritableJobConf(false);
+    JobConf conf = new JobConf(false);
     conf.set("a", "A");
     conf.set("b", "B");
     Configuration deser = serDeser(conf);

+ 1 - 1
src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

@@ -231,7 +231,7 @@ public class TestPipes extends TestCase {
     local.delete(outDir, true);
     local.mkdirs(outDir);
     out = local.create(jobXml);
-    job.write(out);
+    job.writeXml(out);
     out.close();
     System.err.println("About to run: Submitter -conf " + jobXml + 
                        " -input " + inDir + " -output " + outDir +