Browse Source

HADOOP-1965. Reverted the patch due to the problem reported in HADOOP-2419

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@604275 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
32f6e57df0

+ 0 - 3
CHANGES.txt

@@ -122,9 +122,6 @@ Trunk (unreleased changes)
     HADOOP-1898.  Release the lock protecting the last time of the last stack
     HADOOP-1898.  Release the lock protecting the last time of the last stack
     dump while the dump is happening. (Amareshwari Sri Ramadasu via omalley)
     dump while the dump is happening. (Amareshwari Sri Ramadasu via omalley)
 
 
-    HADOOP-1965. Makes the sortAndSpill in MapTask a separate thread.
-    (Amar Kamat via ddas)
-
     HADOOP-1900. Makes the heartbeat and task event queries interval 
     HADOOP-1900. Makes the heartbeat and task event queries interval 
     dependent on the cluster size.  (Amareshwari Sri Ramadasu via ddas)
     dependent on the cluster size.  (Amareshwari Sri Ramadasu via ddas)
 
 

+ 73 - 164
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -255,13 +255,7 @@ class MapTask extends Task {
 
 
     private DataOutputBuffer keyValBuffer; //the buffer where key/val will
     private DataOutputBuffer keyValBuffer; //the buffer where key/val will
                                            //be stored before they are 
                                            //be stored before they are 
-                                           //passed on to the pending buffer
-    private DataOutputBuffer pendingKeyvalBuffer; // the key value buffer used  
-                                                  // while spilling
-    private IOException sortSpillException; //since sort-spill and collect are
-                                            //done concurrently, exceptions are
-                                            //passed through shared variables
-    private final Object sortSpillExceptionLock = new Object();
+                                           //spilled to disk
     private int maxBufferSize; //the max amount of in-memory space after which
     private int maxBufferSize; //the max amount of in-memory space after which
                                //we will spill the keyValBuffer to disk
                                //we will spill the keyValBuffer to disk
     private int numSpills; //maintains the no. of spills to disk done so far
     private int numSpills; //maintains the no. of spills to disk done so far
@@ -273,7 +267,6 @@ class MapTask extends Task {
     private Class valClass;
     private Class valClass;
     private WritableComparator comparator;
     private WritableComparator comparator;
     private BufferSorter []sortImpl;
     private BufferSorter []sortImpl;
-    private BufferSorter []pendingSortImpl; // sort impl for the pending buffer
     private SequenceFile.Writer writer;
     private SequenceFile.Writer writer;
     private FSDataOutputStream out;
     private FSDataOutputStream out;
     private FSDataOutputStream indexOut;
     private FSDataOutputStream indexOut;
@@ -283,10 +276,7 @@ class MapTask extends Task {
       this.partitions = job.getNumReduceTasks();
       this.partitions = job.getNumReduceTasks();
       this.partitioner = (Partitioner)ReflectionUtils.newInstance(
       this.partitioner = (Partitioner)ReflectionUtils.newInstance(
                                                                   job.getPartitionerClass(), job);
                                                                   job.getPartitionerClass(), job);
-      // using one half the total buffer for collecting key-value pairs and 
-      // the other half for sort-spill thus making the two tasks concurrent
-      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2;
-      this.sortSpillException = null;
+      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
       keyValBuffer = new DataOutputBuffer();
       keyValBuffer = new DataOutputBuffer();
 
 
       this.job = job;
       this.job = job;
@@ -348,149 +338,94 @@ class MapTask extends Task {
                               + value.getClass().getName());
                               + value.getClass().getName());
       }
       }
       
       
-      // check if the earlier sort-spill generated an exception
-      synchronized (sortSpillExceptionLock) {
-        if (sortSpillException != null) {
-          throw sortSpillException;
+      synchronized (this) {
+        if (keyValBuffer == null) {
+          keyValBuffer = new DataOutputBuffer();
         }
         }
-      }
-      if (keyValBuffer == null) {
-        keyValBuffer = new DataOutputBuffer();
-        sortImpl = new BufferSorter[partitions];
+        //dump the key/value to buffer
+        int keyOffset = keyValBuffer.getLength(); 
+        key.write(keyValBuffer);
+        int keyLength = keyValBuffer.getLength() - keyOffset;
+        value.write(keyValBuffer);
+        int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
+      
+        int partNumber = partitioner.getPartition(key, value, partitions);
+        sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
+
+        reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
+        reporter.incrCounter(MAP_OUTPUT_BYTES,
+                             (keyValBuffer.getLength() - keyOffset));
+
+        //now check whether we need to spill to disk
+        long totalMem = 0;
         for (int i = 0; i < partitions; i++)
         for (int i = 0; i < partitions; i++)
-          sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
-                                job.getClass("map.sort.class", 
-                                             MergeSorter.class, 
-                                             BufferSorter.class), job);
-      }
-      //dump the key/value to buffer
-      int keyOffset = keyValBuffer.getLength(); 
-      key.write(keyValBuffer);
-      int keyLength = keyValBuffer.getLength() - keyOffset;
-      value.write(keyValBuffer);
-      int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
-      int partNumber = partitioner.getPartition(key, value, partitions);
-      sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
-      reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
-      reporter.incrCounter(MAP_OUTPUT_BYTES, 
-                           (keyValBuffer.getLength() - keyOffset));
-
-      //now check whether we need to spill to disk
-      long totalMem = 0;
-      for (int i = 0; i < partitions; i++)
-        totalMem += sortImpl[i].getMemoryUtilized();
-      totalMem += keyValBuffer.getLength();
-      if (totalMem  >= maxBufferSize) {
-        // check if the earlier spill is pending
-        synchronized (this) {
-          while (pendingKeyvalBuffer != null) {
-            try {            
-              wait(); // wait for the pending spill to finish
-            } catch (InterruptedException ie) {
-              LOG.warn("Buffer interrupted while waiting for the writer", ie);
-            }
-          }
-        }
-        // check if the earlier sort-spill thread generated an exception
-        synchronized (sortSpillExceptionLock) {
-          if (sortSpillException != null) {
-            throw sortSpillException;
-          }
-        }
-        // prepare for spilling
-        synchronized (this) {
-          pendingKeyvalBuffer = keyValBuffer;
-          pendingSortImpl = sortImpl;
+          totalMem += sortImpl[i].getMemoryUtilized();
+        if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
+          sortAndSpillToDisk();
+          //we don't reuse the keyValBuffer. We want to maintain consistency
+          //in the memory model (for negligible performance loss).
           keyValBuffer = null;
           keyValBuffer = null;
-          sortImpl = null;
-        }
-        // Start the sort-spill thread. While the sort and spill takes place 
-        // using the pending variables, the output collector can collect the 
-        // key-value without getting blocked. Thus making key-value collection 
-        // and sort-spill concurrent.
-        Thread bufferWriter = new Thread() {
-          public void run() {
-            sortAndSpillToDisk();
+          for (int i = 0; i < partitions; i++) {
+            sortImpl[i].close();
           }
           }
-        };
-        bufferWriter.setDaemon(true); // to make sure that the buffer writer 
-                                      // gets killed if collector gets killed.
-        bufferWriter.setName("SortSpillThread");
-        bufferWriter.start();
+        }
       }
       }
     }
     }
     
     
     //sort, combine and spill to disk
     //sort, combine and spill to disk
-    private void sortAndSpillToDisk() {
-      try {
+    private void sortAndSpillToDisk() throws IOException {
+      synchronized (this) {
         //approximate the length of the output file to be the length of the
         //approximate the length of the output file to be the length of the
         //buffer + header lengths for the partitions
         //buffer + header lengths for the partitions
-        synchronized (this) {
-          long size = pendingKeyvalBuffer.getLength() 
-                      + partitions  * APPROX_HEADER_LENGTH;
-          Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
-                                                             numSpills, size);
-          //we just create the FSDataOutputStream object here.
-          out = localFs.create(filename);
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                                       getTaskId(), numSpills, 
-                                       partitions * 16);
-          indexOut = localFs.create(indexFilename);
-          LOG.debug("opened " 
-                    + mapOutputFile.getSpillFile(getTaskId(), 
-                                                 numSpills).getName());
-          //invoke the sort
-          for (int i = 0; i < partitions; i++) {
-            pendingSortImpl[i].setInputBuffer(pendingKeyvalBuffer);
-            pendingSortImpl[i].setProgressable(reporter);
-            RawKeyValueIterator rIter = pendingSortImpl[i].sort();
-						
-            startPartition(i);
-            if (rIter != null) {
-              //invoke the combiner if one is defined
-              if (job.getCombinerClass() != null) {
-                //We instantiate and close the combiner for each partition. 
-                //This is required for streaming where the combiner runs as a 
-                //separate process and we want to make sure that the combiner 
-                //process has got all the input key/val, processed, and output 
-                //the result key/vals before we write the partition header in 
-                //the output file.
-                Reducer combiner = (Reducer)ReflectionUtils.newInstance(
-                                                    job.getCombinerClass(), 
-                                                    job);
-                // make collector
-                OutputCollector combineCollector = new OutputCollector() {
+        long size = keyValBuffer.getLength() + 
+                    partitions * APPROX_HEADER_LENGTH;
+        Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
+                                      numSpills, size);
+        //we just create the FSDataOutputStream object here.
+        out = localFs.create(filename);
+        Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+                             getTaskId(), numSpills, partitions * 16);
+        indexOut = localFs.create(indexFilename);
+        LOG.debug("opened "+
+                  mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
+          
+        //invoke the sort
+        for (int i = 0; i < partitions; i++) {
+          sortImpl[i].setInputBuffer(keyValBuffer);
+          sortImpl[i].setProgressable(reporter);
+          RawKeyValueIterator rIter = sortImpl[i].sort();
+          
+          startPartition(i);
+          if (rIter != null) {
+            //invoke the combiner if one is defined
+            if (job.getCombinerClass() != null) {
+              //we instantiate and close the combiner for each partition. This
+              //is required for streaming where the combiner runs as a separate
+              //process and we want to make sure that the combiner process has
+              //got all the input key/val, processed, and output the result 
+              //key/vals before we write the partition header in the output file
+              Reducer combiner = (Reducer)ReflectionUtils.newInstance(
+                                                                      job.getCombinerClass(), job);
+              // make collector
+              OutputCollector combineCollector = new OutputCollector() {
                   public void collect(WritableComparable key, Writable value)
                   public void collect(WritableComparable key, Writable value)
-                  throws IOException {
+                    throws IOException {
                     synchronized (this) {
                     synchronized (this) {
                       writer.append(key, value);
                       writer.append(key, value);
                     }
                     }
                   }
                   }
                 };
                 };
-                combineAndSpill(rIter, combiner, combineCollector);
-                combiner.close();
-              }
-              else //just spill the sorted data
-                spill(rIter);
+              combineAndSpill(rIter, combiner, combineCollector);
+              combiner.close();
             }
             }
-            endPartition(i);
-          }
-          numSpills++;
-          out.close();
-          indexOut.close();
-        }
-      } catch (IOException ioe) {
-        synchronized (sortSpillExceptionLock) {
-          sortSpillException = ioe;
-        }
-      } finally { // make sure that the collector never waits indefinitely
-        synchronized (this) {
-          pendingKeyvalBuffer = null;
-          for (int i = 0; i < partitions; i++) {
-            pendingSortImpl[i].close();
+            else //just spill the sorted data
+              spill(rIter);
           }
           }
-          this.notify();
+          endPartition(i);
         }
         }
+        numSpills++;
+        out.close();
+        indexOut.close();
       }
       }
     }
     }
     
     
@@ -664,35 +599,9 @@ class MapTask extends Task {
       //check whether the length of the key/value buffer is 0. If not, then
       //check whether the length of the key/value buffer is 0. If not, then
       //we need to spill that to disk. Note that we reset the key/val buffer
       //we need to spill that to disk. Note that we reset the key/val buffer
       //upon each spill (so a length > 0 means that we have not spilled yet)
       //upon each spill (so a length > 0 means that we have not spilled yet)
-      
-      // check if the earlier spill is pending
       synchronized (this) {
       synchronized (this) {
-        while (pendingKeyvalBuffer != null) {
-          try {
-            wait();
-          } catch (InterruptedException ie) {
-            LOG.info("Buffer interrupted while for the pending spill", ie);
-          }
-        }
-      }
-      // check if the earlier sort-spill thread generated an exception
-      synchronized (sortSpillExceptionLock) {
-        if (sortSpillException != null) {
-          throw sortSpillException;
-        }
-      }
-      // prepare for next spill
-      if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
-        synchronized (this) {
-          pendingKeyvalBuffer = keyValBuffer;
-          pendingSortImpl = sortImpl;
-        }
-        sortAndSpillToDisk();
-        // check if the last sort-spill thread generated an exception
-        synchronized (sortSpillExceptionLock) {
-          if (sortSpillException != null) {
-            throw sortSpillException;
-          }
+        if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
+          sortAndSpillToDisk();
         }
         }
       }
       }
       mergeParts();
       mergeParts();

+ 0 - 350
src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java

@@ -1,350 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.io.File;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.examples.RandomWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * Distributed threaded map benchmark.
- * <p>
- * This benchmark generates random data per map and tests the performance 
- * of having multiple spills (using multiple threads) over having just one 
- * spill.
- * <li>File size per map is specified as a parameter to the benchmark.
- * <li>Number of spills per map is specified as a parameter to the benchmark. 
- * <li>Number of maps per host is also specified as a parameter to the benchmark.
- * <p>
- * The result of this benchmark is reported in the form of the time required 
- * to run the complete job in either case. RandomWriter is used for generating 
- * the input data. Sort is used for benchmarking.
- */
-
-public class ThreadedMapBenchmark extends Configured implements Tool {
-
-  private static final Log LOG = LogFactory.getLog(ThreadedMapBenchmark.class);
-  private static Path BASE_DIR =
-    new Path(System.getProperty("test.build.data", 
-                                File.separator + "benchmarks" + File.separator 
-                                + "ThreadedMapBenchmark"));
-  private static Path INPUT_DIR = new Path(BASE_DIR, "input");
-  private static Path OUTPUT_DIR = new Path(BASE_DIR, "output");
-  private static final float FACTOR = 2.3f; // io.sort.mb set to 
-                                            // (FACTOR * data_size) should 
-                                            // result in only 1 spill
-
-  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
-  
-  /**
-   * A custom input format that creates virtual inputs of a single string
-   * for each map. Using {@link RandomWriter} code. 
-   */
-  public static class RandomInputFormat implements InputFormat<Text, Text> {
-    
-    public void validateInput(JobConf job) throws IOException {
-    }
-
-    public InputSplit[] getSplits(JobConf job, 
-                                  int numSplits) throws IOException {
-      InputSplit[] result = new InputSplit[numSplits];
-      Path outDir = job.getOutputPath();
-      for(int i=0; i < result.length; ++i) {
-        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
-                                  job);
-      }
-      return result;
-    }
-
-    static class RandomRecordReader implements RecordReader<Text, Text> {
-      Path name;
-      public RandomRecordReader(Path p) {
-        name = p;
-      }
-      public boolean next(Text key, Text value) {
-        if (name != null) {
-          key.set(name.getName());
-          name = null;
-          return true;
-        }
-        return false;
-      }
-      public Text createKey() {
-        return new Text();
-      }
-      public Text createValue() {
-        return new Text();
-      }
-      public long getPos() {
-        return 0;
-      }
-      public void close() {}
-      public float getProgress() {
-        return 0.0f;
-      }
-    }
-
-    public RecordReader<Text, Text> getRecordReader(InputSplit split,
-                                                    JobConf job, 
-                                                    Reporter reporter) 
-    throws IOException {
-      return new RandomRecordReader(((FileSplit) split).getPath());
-    }
-  }
-
-  /**
-   * Generates random input data of given size with keys and values of given 
-   * sizes. By default it generates 128mb input data with 10 byte keys and 10 
-   * byte values.
-   */
-  public static class Map extends MapReduceBase
-  implements Mapper<WritableComparable, Writable,
-                    BytesWritable, BytesWritable> {
-  
-  private long numBytesToWrite;
-  private int minKeySize;
-  private int keySizeRange;
-  private int minValueSize;
-  private int valueSizeRange;
-  private Random random = new Random();
-  private BytesWritable randomKey = new BytesWritable();
-  private BytesWritable randomValue = new BytesWritable();
-  
-  private void randomizeBytes(byte[] data, int offset, int length) {
-    for(int i = offset + length - 1; i >= offset; --i) {
-      data[i] = (byte) random.nextInt(256);
-    }
-  }
-  
-  public void map(WritableComparable key, 
-                  Writable value,
-                  OutputCollector<BytesWritable, BytesWritable> output, 
-                  Reporter reporter) throws IOException {
-    int itemCount = 0;
-    while (numBytesToWrite > 0) {
-      int keyLength = minKeySize 
-                      + (keySizeRange != 0 
-                         ? random.nextInt(keySizeRange) 
-                         : 0);
-      randomKey.setSize(keyLength);
-      randomizeBytes(randomKey.get(), 0, randomKey.getSize());
-      int valueLength = minValueSize 
-                        + (valueSizeRange != 0 
-                           ? random.nextInt(valueSizeRange) 
-                           : 0);
-      randomValue.setSize(valueLength);
-      randomizeBytes(randomValue.get(), 0, randomValue.getSize());
-      output.collect(randomKey, randomValue);
-      numBytesToWrite -= keyLength + valueLength;
-      reporter.incrCounter(Counters.BYTES_WRITTEN, 1);
-      reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
-      if (++itemCount % 200 == 0) {
-        reporter.setStatus("wrote record " + itemCount + ". " 
-                           + numBytesToWrite + " bytes left.");
-      }
-    }
-    reporter.setStatus("done with " + itemCount + " records.");
-  }
-  
-  @Override
-  public void configure(JobConf job) {
-    numBytesToWrite = job.getLong("test.tmb.bytes_per_map",
-                                  128 * 1024 * 1024);
-    minKeySize = job.getInt("test.tmb.min_key", 10);
-    keySizeRange = job.getInt("test.tmb.max_key", 10) - minKeySize;
-    minValueSize = job.getInt("test.tmb.min_value", 10);
-    valueSizeRange = job.getInt("test.tmb.max_value", 10) - minValueSize;
-  }
-}
-
-  /**
-   * Generate input data for the benchmark
-   */
-  public static void generateInputData(int dataSizePerMap, 
-                                       int numSpillsPerMap, 
-                                       int numMapsPerHost, 
-                                       JobConf masterConf) 
-  throws Exception { 
-    JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
-    job.setJobName("threaded-map-benchmark-random-writer");
-    job.setJarByClass(ThreadedMapBenchmark.class);
-    job.setInputFormat(RandomInputFormat.class);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
-    
-    job.setMapperClass(Map.class);
-    job.setReducerClass(IdentityReducer.class);
-    
-    job.setOutputKeyClass(BytesWritable.class);
-    job.setOutputValueClass(BytesWritable.class);
-    
-    JobClient client = new JobClient(job);
-    ClusterStatus cluster = client.getClusterStatus();
-    long totalDataSize = dataSizePerMap * numMapsPerHost 
-                         * cluster.getTaskTrackers();
-    job.set("test.tmb.bytes_per_map", 
-            String.valueOf(dataSizePerMap * 1024 * 1024));
-    job.setNumReduceTasks(0); // none reduce
-    job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
-    job.setOutputPath(INPUT_DIR);
-    
-    FileSystem fs = FileSystem.get(job);
-    fs.delete(BASE_DIR);
-    
-    LOG.info("Generating random input for the benchmark");
-    LOG.info("Total data : " + totalDataSize + " mb");
-    LOG.info("Data per map: " + dataSizePerMap + " mb");
-    LOG.info("Number of spills : " + numSpillsPerMap);
-    LOG.info("Number of maps per host : " + numMapsPerHost);
-    LOG.info("Number of hosts : " + cluster.getTaskTrackers());
-    
-    JobClient.runJob(job); // generates the input for the benchmark
-  }
-
-  /**
-   * This is the main routine for launching the benchmark. It generates random 
-   * input data. The input is non-splittable. Sort is used for benchmarking. 
-   * This benchmark reports the effect of having multiple sort and spill 
-   * cycles over a single sort and spill. 
-   * 
-   * @throws IOException 
-   */
-  public int run (String[] args) throws Exception {
-    LOG.info("Starting the benchmark for threaded spills");
-    String version = "ThreadedMapBenchmark.0.0.1";
-    System.out.println(version);
-    
-    String usage = 
-      "Usage: threadedmapbenchmark " +
-      "[-dataSizePerMap <data size (in mb) per map, default is 128 mb>] " + 
-      "[-numSpillsPerMap <number of spills per map, default is 2>] " +
-      "[-numMapsPerHost <number of maps per host, default is 1>]";
-    
-    int dataSizePerMap = 128; // in mb
-    int numSpillsPerMap = 2;
-    int numMapsPerHost = 1;
-    JobConf masterConf = new JobConf(getConf());
-    
-    for (int i = 0; i < args.length; i++) { // parse command line
-      if (args[i].equals("-dataSizePerMap")) {
-        dataSizePerMap = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-numSpillsPerMap")) {
-        numSpillsPerMap = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-numMapsPerHost")) {
-        numMapsPerHost = Integer.parseInt(args[++i]);
-      } else {
-        System.err.println(usage);
-        System.exit(-1);
-      }
-    }
-    
-    if (dataSizePerMap <  1 ||  // verify arguments
-        numSpillsPerMap < 1 ||
-        numMapsPerHost < 1)
-      {
-        System.err.println(usage);
-        System.exit(-1);
-      }
-    
-    FileSystem fs = null;
-    try {
-      // using random-writer to generate the input data
-      generateInputData(dataSizePerMap, numSpillsPerMap, numMapsPerHost, 
-                        masterConf);
-      
-      // configure job for sorting
-      JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
-      job.setJobName("threaded-map-benchmark-unspilled");
-      job.setJarByClass(ThreadedMapBenchmark.class);
-
-      job.setInputFormat(NonSplitableSequenceFileInputFormat.class);
-      job.setOutputFormat(SequenceFileOutputFormat.class);
-      
-      job.setOutputKeyClass(BytesWritable.class);
-      job.setOutputValueClass(BytesWritable.class);
-      
-      job.setMapperClass(IdentityMapper.class);        
-      job.setReducerClass(IdentityReducer.class);
-      
-      job.addInputPath(INPUT_DIR);
-      job.setOutputPath(OUTPUT_DIR);
-      
-      JobClient client = new JobClient(job);
-      ClusterStatus cluster = client.getClusterStatus();
-      job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
-      job.setNumReduceTasks(1);
-      
-      // set io.sort.mb to avoid spill
-      int ioSortMb = (int)Math.ceil(FACTOR * dataSizePerMap);
-      job.set("io.sort.mb", String.valueOf(ioSortMb));
-      fs = FileSystem.get(job);
-      
-      LOG.info("Running sort with 1 spill per map");
-      long startTime = System.currentTimeMillis();
-      JobClient.runJob(job);
-      long endTime = System.currentTimeMillis();
-      
-      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
-               + " millisec");
-      fs.delete(OUTPUT_DIR);
-      
-      // set io.sort.mb to have multiple spills
-      JobConf spilledJob = new JobConf(job, ThreadedMapBenchmark.class);
-      ioSortMb = (int)Math.ceil(FACTOR 
-                                * Math.ceil((double)dataSizePerMap 
-                                            / numSpillsPerMap));
-      spilledJob.set("io.sort.mb", String.valueOf(ioSortMb));
-      spilledJob.setJobName("threaded-map-benchmark-spilled");
-      spilledJob.setJarByClass(ThreadedMapBenchmark.class);
-      
-      LOG.info("Running sort with " + numSpillsPerMap + " spills per map");
-      startTime = System.currentTimeMillis();
-      JobClient.runJob(spilledJob);
-      endTime = System.currentTimeMillis();
-      
-      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
-               + " millisec");
-    } finally {
-      if (fs != null) {
-        fs.delete(BASE_DIR);
-      }
-    }
-    return 0;
-  }
-
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new ThreadedMapBenchmark(), args);
-    System.exit(res);
-  }
-}

+ 0 - 4
src/test/org/apache/hadoop/test/AllTestDriver.java

@@ -36,7 +36,6 @@ import org.apache.hadoop.io.TestSetFile;
 import org.apache.hadoop.io.TestSequenceFile;
 import org.apache.hadoop.io.TestSequenceFile;
 import org.apache.hadoop.ipc.TestIPC;
 import org.apache.hadoop.ipc.TestIPC;
 import org.apache.hadoop.ipc.TestRPC;
 import org.apache.hadoop.ipc.TestRPC;
-import org.apache.hadoop.mapred.ThreadedMapBenchmark;
 
 
 public class AllTestDriver {
 public class AllTestDriver {
   
   
@@ -46,9 +45,6 @@ public class AllTestDriver {
   public static void main(String argv[]){
   public static void main(String argv[]){
     ProgramDriver pgd = new ProgramDriver();
     ProgramDriver pgd = new ProgramDriver();
     try {
     try {
-      pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, 
-                   "A map/reduce benchmark that compares the performance " + 
-                   "of maps with multiple spills over maps with 1 spill");
       pgd.addClass("mrbench", MRBench.class, "A map/reduce benchmark that can create many small jobs");
       pgd.addClass("mrbench", MRBench.class, "A map/reduce benchmark that can create many small jobs");
       pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the namenode.");
       pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the namenode.");
       pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check.");
       pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check.");