瀏覽代碼

MAPREDUCE-6703. Add flag to allow MapReduce AM to request for OPPORTUNISTIC containers. Contributed by Arun Suresh

Jian He 9 年之前
父節點
當前提交
ae353ea969

+ 108 - 68
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -104,6 +104,7 @@ public class RMContainerAllocator extends RMContainerRequestor
   static final Priority PRIORITY_FAST_FAIL_MAP;
   static final Priority PRIORITY_REDUCE;
   static final Priority PRIORITY_MAP;
+  static final Priority PRIORITY_OPPORTUNISTIC_MAP;
 
   @VisibleForTesting
   public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
@@ -119,6 +120,10 @@ public class RMContainerAllocator extends RMContainerRequestor
     PRIORITY_REDUCE.setPriority(10);
     PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
     PRIORITY_MAP.setPriority(20);
+    PRIORITY_OPPORTUNISTIC_MAP =
+        RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
+            Priority.class);
+    PRIORITY_OPPORTUNISTIC_MAP.setPriority(19);
   }
   
   /*
@@ -226,6 +231,9 @@ public class RMContainerAllocator extends RMContainerRequestor
     // Init startTime to current time. If all goes well, it will be reset after
     // first attempt to contact RM.
     retrystartTime = System.currentTimeMillis();
+    this.scheduledRequests.setNumOpportunisticMapsPer100(
+        conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100,
+            MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100));
   }
 
   @Override
@@ -852,6 +860,8 @@ public class RMContainerAllocator extends RMContainerRequestor
       setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
           failedMapRequestLimit);
       setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
+      setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, mapResourceRequest,
+          normalMapRequestLimit);
     }
 
     int numScheduledReduces = scheduledRequests.reduces.size();
@@ -979,6 +989,12 @@ public class RMContainerAllocator extends RMContainerRequestor
     @VisibleForTesting
     final Map<TaskAttemptId, ContainerRequest> maps =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
+    int mapsMod100 = 0;
+    int numOpportunisticMapsPer100 = 0;
+
+    void setNumOpportunisticMapsPer100(int numMaps) {
+      this.numOpportunisticMapsPer100 = numMaps;
+    }
 
     @VisibleForTesting
     final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@@ -1020,34 +1036,47 @@ public class RMContainerAllocator extends RMContainerRequestor
             new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP,
                 mapNodeLabelExpression);
         LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
+        // If its an earlier Failed attempt, do not retry as OPPORTUNISTIC
+        maps.put(event.getAttemptID(), request);
+        addContainerReq(request);
       } else {
-        for (String host : event.getHosts()) {
-          LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
-          if (list == null) {
-            list = new LinkedList<TaskAttemptId>();
-            mapsHostMapping.put(host, list);
+        if (mapsMod100 < numOpportunisticMapsPer100) {
+          request =
+              new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP,
+                  mapNodeLabelExpression);
+          maps.put(event.getAttemptID(), request);
+          addOpportunisticResourceRequest(request.priority, request.capability);
+        } else {
+          request =
+              new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
+          for (String host : event.getHosts()) {
+            LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+            if (list == null) {
+              list = new LinkedList<TaskAttemptId>();
+              mapsHostMapping.put(host, list);
+            }
+            list.add(event.getAttemptID());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Added attempt req to host " + host);
+            }
           }
-          list.add(event.getAttemptID());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Added attempt req to host " + host);
+          for (String rack : event.getRacks()) {
+            LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+            if (list == null) {
+              list = new LinkedList<TaskAttemptId>();
+              mapsRackMapping.put(rack, list);
+            }
+            list.add(event.getAttemptID());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Added attempt req to rack " + rack);
+            }
           }
-       }
-       for (String rack: event.getRacks()) {
-         LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
-         if (list == null) {
-           list = new LinkedList<TaskAttemptId>();
-           mapsRackMapping.put(rack, list);
-         }
-         list.add(event.getAttemptID());
-         if (LOG.isDebugEnabled()) {
-            LOG.debug("Added attempt req to rack " + rack);
-         }
-       }
-        request =
-            new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
+          maps.put(event.getAttemptID(), request);
+          addContainerReq(request);
+        }
+        mapsMod100++;
+        mapsMod100 %= 100;
       }
-      maps.put(event.getAttemptID(), request);
-      addContainerReq(request);
     }
     
     
@@ -1077,7 +1106,8 @@ public class RMContainerAllocator extends RMContainerRequestor
         Priority priority = allocated.getPriority();
         Resource allocatedResource = allocated.getResource();
         if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
-            || PRIORITY_MAP.equals(priority)) {
+            || PRIORITY_MAP.equals(priority)
+            || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
           if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
               mapResourceRequest, getSchedulerResourceTypes()) <= 0
               || maps.isEmpty()) {
@@ -1235,7 +1265,8 @@ public class RMContainerAllocator extends RMContainerRequestor
         LOG.info("Found replacement: " + toBeReplaced);
         return toBeReplaced;
       }
-      else if (PRIORITY_MAP.equals(priority)) {
+      else if (PRIORITY_MAP.equals(priority)
+          || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
         LOG.info("Replacing MAP container " + allocated.getId());
         // allocated container was for a map
         String host = allocated.getNodeId().getHost();
@@ -1298,29 +1329,33 @@ public class RMContainerAllocator extends RMContainerRequestor
       while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
         Container allocated = it.next();        
         Priority priority = allocated.getPriority();
-        assert PRIORITY_MAP.equals(priority);
-        // "if (maps.containsKey(tId))" below should be almost always true.
-        // hence this while loop would almost always have O(1) complexity
-        String host = allocated.getNodeId().getHost();
-        LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
-        while (list != null && list.size() > 0) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Host matched to the request list " + host);
-          }
-          TaskAttemptId tId = list.removeFirst();
-          if (maps.containsKey(tId)) {
-            ContainerRequest assigned = maps.remove(tId);
-            containerAssigned(allocated, assigned);
-            it.remove();
-            JobCounterUpdateEvent jce =
-              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
-            jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
-            eventHandler.handle(jce);
-            hostLocalAssigned++;
+        assert (PRIORITY_MAP.equals(priority)
+            || PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
+        if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
+          // "if (maps.containsKey(tId))" below should be almost always true.
+          // hence this while loop would almost always have O(1) complexity
+          String host = allocated.getNodeId().getHost();
+          LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+          while (list != null && list.size() > 0) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Assigned based on host match " + host);
+              LOG.debug("Host matched to the request list " + host);
+            }
+            TaskAttemptId tId = list.removeFirst();
+            if (maps.containsKey(tId)) {
+              ContainerRequest assigned = maps.remove(tId);
+              containerAssigned(allocated, assigned);
+              it.remove();
+              JobCounterUpdateEvent jce =
+                  new JobCounterUpdateEvent(assigned.attemptID.getTaskId()
+                      .getJobId());
+              jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
+              eventHandler.handle(jce);
+              hostLocalAssigned++;
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Assigned based on host match " + host);
+              }
+              break;
             }
-            break;
           }
         }
       }
@@ -1330,27 +1365,31 @@ public class RMContainerAllocator extends RMContainerRequestor
       while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
         Container allocated = it.next();
         Priority priority = allocated.getPriority();
-        assert PRIORITY_MAP.equals(priority);
-        // "if (maps.containsKey(tId))" below should be almost always true.
-        // hence this while loop would almost always have O(1) complexity
-        String host = allocated.getNodeId().getHost();
-        String rack = RackResolver.resolve(host).getNetworkLocation();
-        LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
-        while (list != null && list.size() > 0) {
-          TaskAttemptId tId = list.removeFirst();
-          if (maps.containsKey(tId)) {
-            ContainerRequest assigned = maps.remove(tId);
-            containerAssigned(allocated, assigned);
-            it.remove();
-            JobCounterUpdateEvent jce =
-              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
-            jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
-            eventHandler.handle(jce);
-            rackLocalAssigned++;
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Assigned based on rack match " + rack);
+        assert (PRIORITY_MAP.equals(priority)
+            || PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
+        if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
+          // "if (maps.containsKey(tId))" below should be almost always true.
+          // hence this while loop would almost always have O(1) complexity
+          String host = allocated.getNodeId().getHost();
+          String rack = RackResolver.resolve(host).getNetworkLocation();
+          LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+          while (list != null && list.size() > 0) {
+            TaskAttemptId tId = list.removeFirst();
+            if (maps.containsKey(tId)) {
+              ContainerRequest assigned = maps.remove(tId);
+              containerAssigned(allocated, assigned);
+              it.remove();
+              JobCounterUpdateEvent jce =
+                  new JobCounterUpdateEvent(assigned.attemptID.getTaskId()
+                      .getJobId());
+              jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
+              eventHandler.handle(jce);
+              rackLocalAssigned++;
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Assigned based on rack match " + rack);
+              }
+              break;
             }
-            break;
           }
         }
       }
@@ -1360,7 +1399,8 @@ public class RMContainerAllocator extends RMContainerRequestor
       while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
         Container allocated = it.next();
         Priority priority = allocated.getPriority();
-        assert PRIORITY_MAP.equals(priority);
+        assert (PRIORITY_MAP.equals(priority)
+            || PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
         TaskAttemptId tId = maps.keySet().iterator().next();
         ContainerRequest assigned = maps.remove(tId);
         containerAssigned(allocated, assigned);

+ 16 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
@@ -389,7 +390,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
   protected Resource getAvailableResources() {
     return availableResources == null ? Resources.none() : availableResources;
   }
-  
+
   protected void addContainerReq(ContainerRequest req) {
     // Create resource requests
     for (String host : req.hosts) {
@@ -424,8 +425,21 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
   }
 
+  protected void addOpportunisticResourceRequest(Priority priority,
+      Resource capability) {
+    addResourceRequest(priority, ResourceRequest.ANY, capability, null,
+        ExecutionType.OPPORTUNISTIC);
+  }
+
   private void addResourceRequest(Priority priority, String resourceName,
       Resource capability, String nodeLabelExpression) {
+    addResourceRequest(priority, resourceName, capability, nodeLabelExpression,
+        ExecutionType.GUARANTEED);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability, String nodeLabelExpression,
+      ExecutionType executionType) {
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
@@ -448,6 +462,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       remoteRequest.setCapability(capability);
       remoteRequest.setNumContainers(0);
       remoteRequest.setNodeLabelExpression(nodeLabelExpression);
+      remoteRequest.setExecutionType(executionType);
       reqMap.put(capability, remoteRequest);
     }
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);

+ 11 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -966,4 +966,15 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
           128;
 
+  /**
+   * Number of OPPORTUNISTIC Containers per 100 containers that will be
+   * requested by the MRAppMaster. The Default value is 0, which implies all
+   * maps will be guaranteed. A value of 100 means all maps will be requested
+   * as opportunistic. For any other value say 'x', the FIRST 'x' maps
+   * requested by the AM will be opportunistic. If the total number of maps
+   * for the job is less than 'x', then ALL maps will be OPPORTUNISTIC
+   */
+  public static final String MR_NUM_OPPORTUNISTIC_MAPS_PER_100 =
+      "mapreduce.job.num-opportunistic-maps-per-100";
+  public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100 = 0;
 }

+ 281 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java

@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import static org.junit.Assert.*;
+
+/**
+ * Simple MapReduce to test ability of the MRAppMaster to request and use
+ * OPPORTUNISTIC containers.
+ * This test runs a simple external merge sort using MapReduce.
+ * The Hadoop framework's merge on the reduce side will merge the partitions
+ * created to generate the final output which is sorted on the key.
+ */
+@SuppressWarnings(value={"unchecked", "deprecation"})
+public class TestMROpportunisticMaps {
+  // Where MR job's input will reside.
+  private static final Path INPUT_DIR = new Path("/test/input");
+  // Where output goes.
+  private static final Path OUTPUT = new Path("/test/output");
+
+  /**
+   * Test will run with 4 Maps, All OPPORTUNISTIC.
+   * @throws Exception
+   */
+  @Test
+  public void testAllOpportunisticMaps() throws Exception {
+    doTest(4, 1, 1, 4);
+  }
+
+  /**
+   * Test will run with 4 Maps, 2 OPPORTUNISTIC and 2 GUARANTEED.
+   * @throws Exception
+   */
+  @Test
+  public void testHalfOpportunisticMaps() throws Exception {
+    doTest(4, 1, 1, 2);
+  }
+
+  /**
+   * Test will run with 6 Maps and 2 Reducers. All the Maps are OPPORTUNISTIC.
+   * @throws Exception
+   */
+  @Test
+  public void testMultipleReducers() throws Exception {
+    doTest(6, 2, 1, 6);
+  }
+
+  public void doTest(int numMappers, int numReducers, int numNodes,
+      int percent) throws Exception {
+    doTest(numMappers, numReducers, numNodes, 1000, percent);
+  }
+
+  public void doTest(int numMappers, int numReducers, int numNodes,
+      int numLines, int percent) throws Exception {
+    MiniDFSCluster dfsCluster = null;
+    MiniMRClientCluster mrCluster = null;
+    FileSystem fileSystem = null;
+    try {
+      Configuration conf = new Configuration();
+      // Start the mini-MR and mini-DFS clusters
+      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+      dfsCluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(numNodes).build();
+      fileSystem = dfsCluster.getFileSystem();
+      mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
+          numNodes, conf);
+      // Generate input.
+      createInput(fileSystem, numMappers, numLines);
+      // Run the test.
+
+      Configuration jobConf = mrCluster.getConfig();
+      jobConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+
+      runMergeTest(new JobConf(jobConf), fileSystem,
+          numMappers, numReducers, numLines, percent);
+    } finally {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
+      if (mrCluster != null) {
+        mrCluster.stop();
+      }
+    }
+  }
+
+  private void createInput(FileSystem fs, int numMappers, int numLines)
+      throws Exception {
+    fs.delete(INPUT_DIR, true);
+    for (int i = 0; i < numMappers; i++) {
+      OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
+      Writer writer = new OutputStreamWriter(os);
+      for (int j = 0; j < numLines; j++) {
+        // Create sorted key, value pairs.
+        int k = j + 1;
+        String formattedNumber = String.format("%09d", k);
+        writer.write(formattedNumber + " " + formattedNumber + "\n");
+      }
+      writer.close();
+    }
+  }
+
+  private void runMergeTest(JobConf job, FileSystem fileSystem, int
+      numMappers, int numReducers, int numLines, int percent)
+      throws Exception {
+    fileSystem.delete(OUTPUT, true);
+    job.setJobName("Test");
+    JobClient client = new JobClient(job);
+    RunningJob submittedJob = null;
+    FileInputFormat.setInputPaths(job, INPUT_DIR);
+    FileOutputFormat.setOutputPath(job, OUTPUT);
+    job.set("mapreduce.output.textoutputformat.separator", " ");
+    job.setInputFormat(TextInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setMapperClass(MyMapper.class);
+    job.setPartitionerClass(MyPartitioner.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setNumReduceTasks(numReducers);
+
+    // All OPPORTUNISTIC
+    job.setInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100, percent);
+    job.setInt("mapreduce.map.maxattempts", 1);
+    job.setInt("mapreduce.reduce.maxattempts", 1);
+    job.setInt("mapred.test.num_lines", numLines);
+    job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+    try {
+      submittedJob = client.submitJob(job);
+      try {
+        if (!client.monitorAndPrintJob(job, submittedJob)) {
+          throw new IOException("Job failed!");
+        }
+      } catch(InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    } catch(IOException ioe) {
+      System.err.println("Job failed with: " + ioe);
+    } finally {
+      verifyOutput(fileSystem, numMappers, numLines);
+    }
+  }
+
+  private void verifyOutput(FileSystem fileSystem, int numMappers, int numLines)
+      throws Exception {
+    FSDataInputStream dis = null;
+    long numValidRecords = 0;
+    long numInvalidRecords = 0;
+    String prevKeyValue = "000000000";
+    Path[] fileList =
+        FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
+            new Utils.OutputFileUtils.OutputFilesFilter()));
+    for (Path outFile : fileList) {
+      try {
+        dis = fileSystem.open(outFile);
+        String record;
+        while((record = dis.readLine()) != null) {
+          // Split the line into key and value.
+          int blankPos = record.indexOf(" ");
+          String keyString = record.substring(0, blankPos);
+          String valueString = record.substring(blankPos+1);
+          // Check for sorted output and correctness of record.
+          if (keyString.compareTo(prevKeyValue) >= 0
+              && keyString.equals(valueString)) {
+            prevKeyValue = keyString;
+            numValidRecords++;
+          } else {
+            numInvalidRecords++;
+          }
+        }
+      } finally {
+        if (dis != null) {
+          dis.close();
+          dis = null;
+        }
+      }
+    }
+    // Make sure we got all input records in the output in sorted order.
+    assertEquals((long)(numMappers * numLines), numValidRecords);
+    // Make sure there is no extraneous invalid record.
+    assertEquals(0, numInvalidRecords);
+  }
+
+  /**
+   * A mapper implementation that assumes that key text contains valid integers
+   * in displayable form.
+   */
+  public static class MyMapper extends MapReduceBase
+      implements Mapper<LongWritable, Text, Text, Text> {
+    private Text keyText;
+    private Text valueText;
+
+    public MyMapper() {
+      keyText = new Text();
+      valueText = new Text();
+    }
+
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, Text> output,
+        Reporter reporter) throws IOException {
+      String record = value.toString();
+      int blankPos = record.indexOf(" ");
+      keyText.set(record.substring(0, blankPos));
+      valueText.set(record.substring(blankPos+1));
+      output.collect(keyText, valueText);
+    }
+
+    public void close() throws IOException {
+    }
+  }
+
+  /**
+   * Partitioner implementation to make sure that output is in total sorted
+   * order.  We basically route key ranges to different reducers such that
+   * key values monotonically increase with the partition number.  For example,
+   * in a test with 4 reducers, the keys are numbers from 1 to 1000 in the
+   * form "000000001" to "000001000" in each input file. The keys "000000001"
+   * to "000000250" are routed to partition 0, "000000251" to "000000500" are
+   * routed to partition 1.
+   */
+  static class MyPartitioner implements Partitioner<Text, Text> {
+
+    private JobConf job;
+
+    public MyPartitioner() {
+    }
+
+    public void configure(JobConf jobConf) {
+      this.job = jobConf;
+    }
+
+    public int getPartition(Text key, Text value, int numPartitions) {
+      int keyValue = 0;
+      try {
+        keyValue = Integer.parseInt(key.toString());
+      } catch(NumberFormatException nfe) {
+        keyValue = 0;
+      }
+      int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) /
+          job.getInt("mapred.test.num_lines", 10000);
+      return partitionNumber;
+    }
+  }
+
+}