|
@@ -353,7 +353,7 @@ class MapTask extends Task {
|
|
|
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
|
|
|
|
try {
|
|
|
- runner.run(in, collector, reporter);
|
|
|
+ runner.run(in, new OldOutputCollector(collector, conf), reporter);
|
|
|
collector.flush();
|
|
|
} finally {
|
|
|
//close
|
|
@@ -427,20 +427,80 @@ class MapTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Since the mapred and mapreduce Partitioners don't share a common interface
|
|
|
+ * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
|
|
|
+ * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
|
|
|
+ * the configured partitioner should not be called. It's common for
|
|
|
+ * partitioners to compute a result mod numReduces, which causes a div0 error
|
|
|
+ */
|
|
|
+ private static class OldOutputCollector<K,V> implements OutputCollector<K,V> {
|
|
|
+ private final Partitioner<K,V> partitioner;
|
|
|
+ private final MapOutputCollector<K,V> collector;
|
|
|
+ private final int numPartitions;
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
|
|
|
+ numPartitions = conf.getNumReduceTasks();
|
|
|
+ if (numPartitions > 0) {
|
|
|
+ partitioner = (Partitioner<K,V>)
|
|
|
+ ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
|
|
|
+ } else {
|
|
|
+ partitioner = new Partitioner<K,V>() {
|
|
|
+ @Override
|
|
|
+ public void configure(JobConf job) { }
|
|
|
+ @Override
|
|
|
+ public int getPartition(K key, V value, int numPartitions) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ this.collector = collector;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void collect(K key, V value) throws IOException {
|
|
|
+ try {
|
|
|
+ collector.collect(key, value,
|
|
|
+ partitioner.getPartition(key, value, numPartitions));
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new IOException("interrupt exception", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private class NewOutputCollector<K,V>
|
|
|
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
|
|
|
- private MapOutputCollector<K,V> collector;
|
|
|
+ private final MapOutputCollector<K,V> collector;
|
|
|
+ private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
|
|
|
+ private final int partitions;
|
|
|
|
|
|
- NewOutputCollector(JobConf job,
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
|
|
|
+ JobConf job,
|
|
|
TaskUmbilicalProtocol umbilical,
|
|
|
TaskReporter reporter
|
|
|
) throws IOException, ClassNotFoundException {
|
|
|
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
|
|
|
+ partitions = jobContext.getNumReduceTasks();
|
|
|
+ if (partitions > 0) {
|
|
|
+ partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
|
|
|
+ ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
|
|
|
+ } else {
|
|
|
+ partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
|
|
|
+ @Override
|
|
|
+ public int getPartition(K key, V value, int numPartitions) {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void write(K key, V value) throws IOException {
|
|
|
- collector.collect(key, value);
|
|
|
+ public void write(K key, V value) throws IOException, InterruptedException {
|
|
|
+ collector.collect(key, value,
|
|
|
+ partitioner.getPartition(key, value, partitions));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -510,7 +570,7 @@ class MapTask extends Task {
|
|
|
if (job.getNumReduceTasks() == 0) {
|
|
|
output = outputFormat.getRecordWriter(taskContext);
|
|
|
} else {
|
|
|
- output = new NewOutputCollector(job, umbilical, reporter);
|
|
|
+ output = new NewOutputCollector(taskContext, job, umbilical, reporter);
|
|
|
}
|
|
|
|
|
|
mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
|
|
@@ -532,9 +592,10 @@ class MapTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- interface MapOutputCollector<K, V>
|
|
|
- extends OutputCollector<K, V> {
|
|
|
+ interface MapOutputCollector<K, V> {
|
|
|
|
|
|
+ public void collect(K key, V value, int partition
|
|
|
+ ) throws IOException, InterruptedException;
|
|
|
public void close() throws IOException, InterruptedException;
|
|
|
|
|
|
public void flush() throws IOException, InterruptedException,
|
|
@@ -574,7 +635,7 @@ class MapTask extends Task {
|
|
|
ClassNotFoundException {
|
|
|
}
|
|
|
|
|
|
- public void collect(K key, V value) throws IOException {
|
|
|
+ public void collect(K key, V value, int partition) throws IOException {
|
|
|
reporter.progress();
|
|
|
out.write(key, value);
|
|
|
mapOutputRecordCounter.increment(1);
|
|
@@ -585,7 +646,6 @@ class MapTask extends Task {
|
|
|
class MapOutputBuffer<K extends Object, V extends Object>
|
|
|
implements MapOutputCollector<K, V>, IndexedSortable {
|
|
|
private final int partitions;
|
|
|
- private final Partitioner<K, V> partitioner;
|
|
|
private final JobConf job;
|
|
|
private final TaskReporter reporter;
|
|
|
private final Class<K> keyClass;
|
|
@@ -653,7 +713,6 @@ class MapTask extends Task {
|
|
|
this.reporter = reporter;
|
|
|
localFs = FileSystem.getLocal(job);
|
|
|
partitions = job.getNumReduceTasks();
|
|
|
- partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
|
|
|
|
|
|
rfs = ((LocalFileSystem)localFs).getRaw();
|
|
|
|
|
@@ -739,8 +798,8 @@ class MapTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized void collect(K key, V value)
|
|
|
- throws IOException {
|
|
|
+ public synchronized void collect(K key, V value, int partition
|
|
|
+ ) throws IOException {
|
|
|
reporter.progress();
|
|
|
if (key.getClass() != keyClass) {
|
|
|
throw new IOException("Type mismatch in key from map: expected "
|
|
@@ -801,7 +860,6 @@ class MapTask extends Task {
|
|
|
valSerializer.serialize(value);
|
|
|
int valend = bb.markRecord();
|
|
|
|
|
|
- final int partition = partitioner.getPartition(key, value, partitions);
|
|
|
if (partition < 0 || partition >= partitions) {
|
|
|
throw new IOException("Illegal partition for " + key + " (" +
|
|
|
partition + ")");
|
|
@@ -821,7 +879,7 @@ class MapTask extends Task {
|
|
|
kvindex = kvnext;
|
|
|
} catch (MapBufferTooSmallException e) {
|
|
|
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
|
|
|
- spillSingleRecord(key, value);
|
|
|
+ spillSingleRecord(key, value, partition);
|
|
|
mapOutputRecordCounter.increment(1);
|
|
|
return;
|
|
|
}
|
|
@@ -1201,11 +1259,10 @@ class MapTask extends Task {
|
|
|
* the in-memory buffer, so we must spill the record from collect
|
|
|
* directly to a spill file. Consider this "losing".
|
|
|
*/
|
|
|
- private void spillSingleRecord(final K key, final V value)
|
|
|
- throws IOException {
|
|
|
+ private void spillSingleRecord(final K key, final V value,
|
|
|
+ int partition) throws IOException {
|
|
|
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
|
|
|
FSDataOutputStream out = null;
|
|
|
- final int partition = partitioner.getPartition(key, value, partitions);
|
|
|
try {
|
|
|
// create spill file
|
|
|
final SpillRecord spillRec = new SpillRecord(partitions);
|