Explorar o código

commit 51be5c3d61cbc7960174493428fbaa41d5fbe84d
Author: Chris Douglas <cdouglas@apache.org>
Date: Fri Oct 1 01:49:51 2010 -0700

Change client-side enforcement of limit on locations per split
to be advisory. Truncate on client, optionally fail job at JobTracker if
exceeded. Added mapreduce.job.max.split.locations property.

+++ b/YAHOO-CHANGES.txt
+ Change client-side enforcement of limit on locations per split
+ to be advisory. Truncate on client, optionally fail job at JobTracker if
+ exceeded. Added mapreduce.job.max.split.locations property. (cdouglas)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077730 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley %!s(int64=14) %!d(string=hai) anos
pai
achega
35cb52ee88

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -775,7 +775,8 @@ public class JobInProgress {
   TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
   throws IOException {
     TaskSplitMetaInfo[] allTaskSplitMetaInfo =
-      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
+      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, jobtracker.getConf(),
+          jobSubmitDir);
     return allTaskSplitMetaInfo;
   }
 

+ 16 - 7
src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.split;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -35,15 +36,19 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * The class that is used by the Job clients to write splits (both the meta
  * and the raw bytes parts)
  */
 public class JobSplitWriter {
 
+  private static final Log LOG = LogFactory.getLog(JobSplitWriter.class);
   private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
   private static final byte[] SPLIT_FILE_HEADER;
-  private static final int MAX_BLOCK_LOCATIONS = 100;
+  static final String MAX_SPLIT_LOCATIONS = "mapreduce.job.max.split.locations";
   
   static {
     try {
@@ -121,10 +126,12 @@ public class JobSplitWriter {
         serializer.serialize(split);
         int currCount = out.size();
         String[] locations = split.getLocations();
-        if (locations.length > MAX_BLOCK_LOCATIONS) {
-          throw new IOException("Max block location exceeded for split: "
+        final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
+        if (locations.length > max_loc) {
+          LOG.warn("Max block location exceeded for split: "
               + split + " splitsize: " + locations.length +
-              " maxsize: " + MAX_BLOCK_LOCATIONS);
+              " maxsize: " + max_loc);
+          locations = Arrays.copyOf(locations, max_loc);
         }
         info[i++] = 
           new JobSplit.SplitMetaInfo( 
@@ -149,10 +156,12 @@ public class JobSplitWriter {
         split.write(out);
         int currLen = out.size();
         String[] locations = split.getLocations();
-        if (locations.length > MAX_BLOCK_LOCATIONS) {
-          throw new IOException("Max block location exceeded for split: "
+        final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
+        if (locations.length > max_loc) {
+          LOG.warn("Max block location exceeded for split: "
               + split + " splitsize: " + locations.length +
-              " maxsize: " + MAX_BLOCK_LOCATIONS);
+              " maxsize: " + max_loc);
+          locations = Arrays.copyOf(locations, max_loc);
         }
         info[i++] = new JobSplit.SplitMetaInfo( 
             locations, offset,

+ 7 - 0
src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java

@@ -62,9 +62,16 @@ public class SplitMetaInfoReader {
     int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
     JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = 
       new JobSplit.TaskSplitMetaInfo[numSplits];
+    final int maxLocations =
+      conf.getInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, Integer.MAX_VALUE);
     for (int i = 0; i < numSplits; i++) {
       JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
       splitMetaInfo.readFields(in);
+      final int numLocations = splitMetaInfo.getLocations().length;
+      if (numLocations > maxLocations) {
+        throw new IOException("Max block location exceeded for split: #"  + i +
+              " splitsize: " + numLocations + " maxsize: " + maxLocations);
+      }
       JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
           JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(), 
           splitMetaInfo.getStartOffset());

+ 25 - 22
src/test/org/apache/hadoop/mapred/TestBlockLimits.java → src/test/org/apache/hadoop/mapreduce/split/TestBlockLimits.java

@@ -15,24 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.hadoop.mapred;
+package org.apache.hadoop.mapreduce.split;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 
 import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -49,6 +50,9 @@ public class TestBlockLimits extends TestCase {
     MiniMRCluster mr = null;
     try {
       mr = new MiniMRCluster(2, "file:///", 3);
+      Configuration conf = new Configuration();
+      conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, 10);
+      mr = new MiniMRCluster(2, "file:///", 3, null, null, new JobConf(conf));
       runCustomFormat(mr);
     } finally {
       if (mr != null) { mr.shutdown(); }
@@ -56,7 +60,8 @@ public class TestBlockLimits extends TestCase {
   }
   
   private void runCustomFormat(MiniMRCluster mr) throws IOException {
-    JobConf job = mr.createJobConf();
+    JobConf job = new JobConf(mr.createJobConf());
+    job.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, 100);
     FileSystem fileSys = FileSystem.get(job);
     Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
     Path outDir = new Path(testDir, "out");
@@ -65,16 +70,14 @@ public class TestBlockLimits extends TestCase {
     job.setInputFormat(MyInputFormat.class);
     job.setOutputFormat(MyOutputFormat.class);
     job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
+    job.setOutputValueClass(Text.class);
     
     job.setMapperClass(MyMapper.class);        
-    job.setReducerClass(MyReducer.class);
-    job.setNumMapTasks(100);
-    job.setNumReduceTasks(1);
+    job.setNumReduceTasks(0);
     job.set("non.std.out", outDir.toString());
     try {
       JobClient.runJob(job);
-      assertTrue(false);
+      fail("JobTracker neglected to fail misconfigured job");
     } catch(IOException ie) {
       System.out.println("Failed job " + StringUtils.stringifyException(ie));
     } finally {
@@ -93,17 +96,8 @@ public class TestBlockLimits extends TestCase {
     }
   }
 
-  static class MyReducer extends MapReduceBase
-    implements Reducer<WritableComparable, Writable,
-                    WritableComparable, Writable> {
-      public void reduce(WritableComparable key, Iterator<Writable> values,
-                     OutputCollector<WritableComparable, Writable> output,
-                     Reporter reporter) throws IOException {
-      }
-  }
-
   private static class MyInputFormat
-    implements InputFormat<IntWritable, Text> {
+    implements InputFormat<Text, Text> {
     
     private static class MySplit implements InputSplit {
       int first;
@@ -117,7 +111,9 @@ public class TestBlockLimits extends TestCase {
       }
 
       public String[] getLocations() {
-        return new String[200];
+        final String[] ret = new String[200];
+        Arrays.fill(ret, "SPLIT");
+        return ret;
       }
 
       public long getLength() {
@@ -141,12 +137,19 @@ public class TestBlockLimits extends TestCase {
                            new MySplit(4, 2)};
     }
 
-    public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
                                                            JobConf job, 
                                                            Reporter reporter)
                                                            throws IOException {
       MySplit sp = (MySplit) split;
-      return null;
+      return new RecordReader<Text,Text>() {
+        @Override public boolean next(Text key, Text value) { return false; }
+        @Override public Text createKey() { return new Text(); }
+        @Override public Text createValue() { return new Text(); }
+        @Override public long getPos() throws IOException { return 0; }
+        @Override public void close() throws IOException { }
+        @Override public float getProgress() throws IOException { return 1.0f; }
+      };
     }
     
   }

+ 138 - 0
src/test/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java

@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class TestJobSplitWriter {
+
+  static final String TEST_ROOT = System.getProperty("test.build.data", "/tmp");
+  static final Path TEST_DIR =
+    new Path(TEST_ROOT, TestJobSplitWriter.class.getSimpleName());
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    final FileSystem fs = FileSystem.getLocal(new Configuration()).getRaw();
+    fs.delete(TEST_DIR, true);
+  }
+
+  static abstract class NewSplit extends InputSplit implements Writable {
+    @Override public long getLength() { return 42L; }
+    @Override public void readFields(DataInput in) throws IOException { }
+    @Override public void write(DataOutput in) throws IOException { }
+  }
+
+  @Test
+  public void testSplitLocationLimit()
+      throws IOException, InterruptedException  {
+    final int SPLITS = 5;
+    final int MAX_LOC = 10;
+    final Path outdir = new Path(TEST_DIR, "testSplitLocationLimit");
+    final String[] locs = getLoc(MAX_LOC + 5);
+    final Configuration conf = new Configuration();
+    final FileSystem rfs = FileSystem.getLocal(conf).getRaw();
+    final InputSplit split = new NewSplit() {
+      @Override public String[] getLocations() { return locs; }
+    };
+    List<InputSplit> splits = Collections.nCopies(SPLITS, split);
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC);
+    JobSplitWriter.createSplitFiles(outdir, conf,
+        FileSystem.getLocal(conf).getRaw(), splits);
+
+    checkMeta(MAX_LOC,
+        SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir),
+        Arrays.copyOf(locs, MAX_LOC));
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC / 2);
+    try {
+      SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir);
+      fail("Reader failed to detect location limit");
+    } catch (IOException e) { }
+  }
+
+  static abstract class OldSplit
+      implements org.apache.hadoop.mapred.InputSplit {
+    @Override public long getLength() { return 42L; }
+    @Override public void readFields(DataInput in) throws IOException { }
+    @Override public void write(DataOutput in) throws IOException { }
+  }
+
+  @Test
+  public void testSplitLocationLimitOldApi() throws IOException {
+    final int SPLITS = 5;
+    final int MAX_LOC = 10;
+    final Path outdir = new Path(TEST_DIR, "testSplitLocationLimitOldApi");
+    final String[] locs = getLoc(MAX_LOC + 5);
+    final Configuration conf = new Configuration();
+    final FileSystem rfs = FileSystem.getLocal(conf).getRaw();
+    final org.apache.hadoop.mapred.InputSplit split = new OldSplit() {
+      @Override public String[] getLocations() { return locs; }
+    };
+    org.apache.hadoop.mapred.InputSplit[] splits =
+      new org.apache.hadoop.mapred.InputSplit[SPLITS];
+    Arrays.fill(splits, split);
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC);
+    JobSplitWriter.createSplitFiles(outdir, conf,
+        FileSystem.getLocal(conf).getRaw(), splits);
+    checkMeta(MAX_LOC,
+        SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir),
+        Arrays.copyOf(locs, MAX_LOC));
+
+    conf.setInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, MAX_LOC / 2);
+    try {
+      SplitMetaInfoReader.readSplitMetaInfo(null, rfs, conf, outdir);
+      fail("Reader failed to detect location limit");
+    } catch (IOException e) { }
+  }
+
+  private static void checkMeta(int MAX_LOC,
+      JobSplit.TaskSplitMetaInfo[] metaSplits, String[] chk_locs) {
+    for (JobSplit.TaskSplitMetaInfo meta : metaSplits) {
+      final String[] meta_locs = meta.getLocations();
+      assertEquals(MAX_LOC, meta_locs.length);
+      assertArrayEquals(chk_locs, meta_locs);
+    }
+  }
+
+  private static String[] getLoc(int locations) {
+    final String ret[] = new String[locations];
+    for (int i = 0; i < locations; ++i) {
+      ret[i] = "LOC" + i;
+    }
+    return ret;
+  }
+
+}