Browse Source

Permit configuration to specify higher replication for job submission files. Also reduce complaints when a file's replication is greater than the size of the cluster.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@397601 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 năm trước cách đây
mục cha
commit
ee3b636ab5

+ 4 - 0
CHANGES.txt

@@ -112,6 +112,10 @@ Trunk (unreleased)
     the replication count of individual files.  Also fix a few
     replication-related bugs. (Konstantin Shvachko via cutting)
 
+30. Permit specification of a higher replication levels for job
+    submission files (job.xml and job.jar).  This helps with large
+    clusters, since these files are read by every node.
+
 
 Release 0.1.1 - 2006-04-08
 

+ 8 - 0
conf/hadoop-default.xml

@@ -258,6 +258,14 @@
   take priority over this setting.</description>
 </property>
 
+<property>
+  <name>mapred.submit.replication</name>
+  <value>10</value>
+  <description>The replication level for submitted job files.  This
+  should be around the square root of the number of nodes.
+  </description>
+</property>
+
 
 <!-- ipc properties -->
 

+ 7 - 0
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1392,6 +1392,13 @@ class FSNamesystem implements FSConstants {
      * @return array of DatanodeInfo instances uses as targets.
      */
     DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes, UTF8 clientMachine) {
+        if (desiredReplicates > datanodeMap.size()) {
+          LOG.warning("Replication requested of "+desiredReplicates
+                      +" is larger than cluster size ("+datanodeMap.size()
+                      +"). Using cluster size.");
+          desiredReplicates  = datanodeMap.size();
+        }
+
         TreeSet alreadyChosen = new TreeSet();
         Vector targets = new Vector();
 

+ 11 - 0
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -178,6 +178,17 @@ public abstract class FileSystem extends Configured {
                     (short)getConf().getInt("dfs.replication", 3));
     }
 
+    /**
+     * Opens an FSDataOutputStream at the indicated Path.
+     * Files are overwritten by default.
+     */
+    public FSDataOutputStream create(Path f, short replication)
+      throws IOException {
+      return create(f, true, 
+                    getConf().getInt("io.file.buffer.size", 4096),
+                    replication);
+    }
+
     /**
      * Opens an FSDataOutputStream at the indicated Path.
      * @param f the file name to open

+ 4 - 1
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -241,9 +241,12 @@ public class JobClient implements MRConstants {
         FileSystem localFs = FileSystem.getNamed("local", job);
         FileSystem fs = getFs();
 
+        short replication = (short)job.getInt("mapred.submit.replication", 10);
+
         if (originalJarPath != null) {           // copy jar to JobTracker's fs
           job.setJar(submitJarFile.toString());
           fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
+          fs.setReplication(submitJarFile, replication);
         }
 
         // Set the user's name and working directory
@@ -257,7 +260,7 @@ public class JobClient implements MRConstants {
         job.getOutputFormat().checkOutputSpecs(fs, job);
 
         // Write job file to JobTracker's fs        
-        FSDataOutputStream out = fs.create(submitJobFile);
+        FSDataOutputStream out = fs.create(submitJobFile, replication);
         try {
           job.write(out);
         } finally {