|
@@ -30,6 +30,8 @@ import java.io.DataOutput;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
+import java.lang.reflect.Constructor;
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.locks.Condition;
|
|
@@ -37,7 +39,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
@@ -50,12 +52,13 @@ import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
+import org.apache.hadoop.io.serializer.Deserializer;
|
|
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
|
|
import org.apache.hadoop.io.serializer.Serializer;
|
|
|
import org.apache.hadoop.mapred.IFile.Writer;
|
|
|
-import org.apache.hadoop.mapred.IFile.Reader;
|
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
|
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
|
|
|
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
import org.apache.hadoop.util.IndexedSortable;
|
|
|
import org.apache.hadoop.util.IndexedSorter;
|
|
|
import org.apache.hadoop.util.Progress;
|
|
@@ -72,7 +75,6 @@ class MapTask extends Task {
|
|
|
|
|
|
private BytesWritable split = new BytesWritable();
|
|
|
private String splitClass;
|
|
|
- private InputSplit instantiatedSplit = null;
|
|
|
private final static int APPROX_HEADER_LENGTH = 150;
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
|
|
@@ -131,11 +133,6 @@ class MapTask extends Task {
|
|
|
split.readFields(in);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- InputSplit getInputSplit() throws UnsupportedOperationException {
|
|
|
- return instantiatedSplit;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* This class wraps the user's record reader to update the counters and progress
|
|
|
* as records are read.
|
|
@@ -147,14 +144,16 @@ class MapTask extends Task {
|
|
|
private RecordReader<K,V> rawIn;
|
|
|
private Counters.Counter inputByteCounter;
|
|
|
private Counters.Counter inputRecordCounter;
|
|
|
+ private TaskReporter reporter;
|
|
|
private long beforePos = -1;
|
|
|
private long afterPos = -1;
|
|
|
|
|
|
- TrackedRecordReader(RecordReader<K,V> raw, Counters counters)
|
|
|
+ TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter)
|
|
|
throws IOException{
|
|
|
rawIn = raw;
|
|
|
- inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
|
|
|
- inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
|
|
|
+ inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
|
|
|
+ inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
|
|
|
+ this.reporter = reporter;
|
|
|
}
|
|
|
|
|
|
public K createKey() {
|
|
@@ -181,7 +180,7 @@ class MapTask extends Task {
|
|
|
|
|
|
protected synchronized boolean moveToNext(K key, V value)
|
|
|
throws IOException {
|
|
|
- setProgress(getProgress());
|
|
|
+ reporter.setProgress(getProgress());
|
|
|
beforePos = getPos();
|
|
|
boolean ret = rawIn.next(key, value);
|
|
|
afterPos = getPos();
|
|
@@ -193,6 +192,9 @@ class MapTask extends Task {
|
|
|
public float getProgress() throws IOException {
|
|
|
return rawIn.getProgress();
|
|
|
}
|
|
|
+ TaskReporter getTaskReporter() {
|
|
|
+ return reporter;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -207,11 +209,11 @@ class MapTask extends Task {
|
|
|
private Counters.Counter skipRecCounter;
|
|
|
private long recIndex = -1;
|
|
|
|
|
|
- SkippingRecordReader(RecordReader<K,V> raw, Counters counters,
|
|
|
- TaskUmbilicalProtocol umbilical) throws IOException{
|
|
|
- super(raw,counters);
|
|
|
+ SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
|
|
|
+ TaskReporter reporter) throws IOException{
|
|
|
+ super(raw, reporter);
|
|
|
this.umbilical = umbilical;
|
|
|
- this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
|
|
|
+ this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
|
|
|
this.toWriteSkipRecs = toWriteSkipRecs() &&
|
|
|
SkipBadRecords.getSkipOutputPath(conf)!=null;
|
|
|
skipIt = getSkipRanges().skipRangeIterator();
|
|
@@ -261,44 +263,50 @@ class MapTask extends Task {
|
|
|
skipFile.getFileSystem(conf), conf, skipFile,
|
|
|
(Class<K>) createKey().getClass(),
|
|
|
(Class<V>) createValue().getClass(),
|
|
|
- CompressionType.BLOCK, getReporter(umbilical));
|
|
|
+ CompressionType.BLOCK, getTaskReporter());
|
|
|
}
|
|
|
skipWriter.append(key, value);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
|
|
|
- throws IOException {
|
|
|
-
|
|
|
- final Reporter reporter = getReporter(umbilical);
|
|
|
+ throws IOException, ClassNotFoundException, InterruptedException {
|
|
|
|
|
|
// start thread that will handle communication with parent
|
|
|
- startCommunicationThread(umbilical);
|
|
|
+ TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
|
|
|
+ reporter.startCommunicationThread();
|
|
|
+ boolean useNewApi = job.getUseNewMapper();
|
|
|
+ initialize(job, getJobID(), reporter, useNewApi);
|
|
|
|
|
|
- initialize(job, reporter);
|
|
|
// check if it is a cleanupJobTask
|
|
|
if (cleanupJob) {
|
|
|
- runCleanup(umbilical);
|
|
|
+ runCleanup(umbilical, reporter);
|
|
|
return;
|
|
|
}
|
|
|
if (setupJob) {
|
|
|
- runSetupJob(umbilical);
|
|
|
+ runSetupJob(umbilical, reporter);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- int numReduceTasks = conf.getNumReduceTasks();
|
|
|
- LOG.info("numReduceTasks: " + numReduceTasks);
|
|
|
- MapOutputCollector collector = null;
|
|
|
- if (numReduceTasks > 0) {
|
|
|
- collector = new MapOutputBuffer(umbilical, job, reporter);
|
|
|
- } else {
|
|
|
- collector = new DirectMapOutputCollector(umbilical, job, reporter);
|
|
|
+ if (useNewApi) {
|
|
|
+ runNewMapper(job, split, umbilical, reporter);
|
|
|
+ } else {
|
|
|
+ runOldMapper(job, split, umbilical, reporter);
|
|
|
}
|
|
|
+ done(umbilical, reporter);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private <INKEY,INVALUE,OUTKEY,OUTVALUE>
|
|
|
+ void runOldMapper(final JobConf job,
|
|
|
+ final BytesWritable rawSplit,
|
|
|
+ final TaskUmbilicalProtocol umbilical,
|
|
|
+ TaskReporter reporter) throws IOException {
|
|
|
+ InputSplit inputSplit = null;
|
|
|
// reinstantiate the split
|
|
|
try {
|
|
|
- instantiatedSplit = (InputSplit)
|
|
|
+ inputSplit = (InputSplit)
|
|
|
ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
|
|
|
} catch (ClassNotFoundException exp) {
|
|
|
IOException wrap = new IOException("Split class " + splitClass +
|
|
@@ -308,24 +316,28 @@ class MapTask extends Task {
|
|
|
}
|
|
|
DataInputBuffer splitBuffer = new DataInputBuffer();
|
|
|
splitBuffer.reset(split.getBytes(), 0, split.getLength());
|
|
|
- instantiatedSplit.readFields(splitBuffer);
|
|
|
+ inputSplit.readFields(splitBuffer);
|
|
|
|
|
|
- // if it is a file split, we can give more details
|
|
|
- if (instantiatedSplit instanceof FileSplit) {
|
|
|
- FileSplit fileSplit = (FileSplit) instantiatedSplit;
|
|
|
- job.set("map.input.file", fileSplit.getPath().toString());
|
|
|
- job.setLong("map.input.start", fileSplit.getStart());
|
|
|
- job.setLong("map.input.length", fileSplit.getLength());
|
|
|
- }
|
|
|
-
|
|
|
- RecordReader rawIn = // open input
|
|
|
- job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
|
|
|
- RecordReader in = isSkipping() ?
|
|
|
- new SkippingRecordReader(rawIn, getCounters(), umbilical) :
|
|
|
- new TrackedRecordReader(rawIn, getCounters());
|
|
|
+ updateJobWithSplit(job, inputSplit);
|
|
|
+ reporter.setInputSplit(inputSplit);
|
|
|
+
|
|
|
+ RecordReader<INKEY,INVALUE> rawIn = // open input
|
|
|
+ job.getInputFormat().getRecordReader(inputSplit, job, reporter);
|
|
|
+ RecordReader<INKEY,INVALUE> in = isSkipping() ?
|
|
|
+ new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
|
|
|
+ new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
|
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
|
|
|
|
- MapRunnable runner =
|
|
|
+
|
|
|
+ int numReduceTasks = conf.getNumReduceTasks();
|
|
|
+ LOG.info("numReduceTasks: " + numReduceTasks);
|
|
|
+ MapOutputCollector collector = null;
|
|
|
+ if (numReduceTasks > 0) {
|
|
|
+ collector = new MapOutputBuffer(umbilical, job, reporter);
|
|
|
+ } else {
|
|
|
+ collector = new DirectMapOutputCollector(umbilical, job, reporter);
|
|
|
+ }
|
|
|
+ MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
|
|
|
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|
|
|
|
|
|
try {
|
|
@@ -336,7 +348,168 @@ class MapTask extends Task {
|
|
|
in.close(); // close input
|
|
|
collector.close();
|
|
|
}
|
|
|
- done(umbilical);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Update the job with details about the file split
|
|
|
+ * @param job the job configuration to update
|
|
|
+ * @param inputSplit the file split
|
|
|
+ */
|
|
|
+ private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
|
|
|
+ if (inputSplit instanceof FileSplit) {
|
|
|
+ FileSplit fileSplit = (FileSplit) inputSplit;
|
|
|
+ job.set("map.input.file", fileSplit.getPath().toString());
|
|
|
+ job.setLong("map.input.start", fileSplit.getStart());
|
|
|
+ job.setLong("map.input.length", fileSplit.getLength());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class NewTrackingRecordReader<K,V>
|
|
|
+ extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
|
|
|
+ private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
|
|
|
+ private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
|
|
|
+
|
|
|
+ NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
|
|
|
+ TaskReporter reporter) {
|
|
|
+ this.real = real;
|
|
|
+ this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ real.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public K getCurrentKey() throws IOException, InterruptedException {
|
|
|
+ return real.getCurrentKey();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public V getCurrentValue() throws IOException, InterruptedException {
|
|
|
+ return real.getCurrentValue();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public float getProgress() throws IOException, InterruptedException {
|
|
|
+ return real.getProgress();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
|
|
|
+ org.apache.hadoop.mapreduce.TaskAttemptContext context
|
|
|
+ ) throws IOException, InterruptedException {
|
|
|
+ real.initialize(split, context);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean nextKeyValue() throws IOException, InterruptedException {
|
|
|
+ boolean result = real.nextKeyValue();
|
|
|
+ if (result) {
|
|
|
+ inputRecordCounter.increment(1);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class NewOutputCollector<K,V>
|
|
|
+ extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
|
|
|
+ private MapOutputCollector<K,V> collector;
|
|
|
+
|
|
|
+ NewOutputCollector(JobConf job,
|
|
|
+ TaskUmbilicalProtocol umbilical,
|
|
|
+ TaskReporter reporter
|
|
|
+ ) throws IOException {
|
|
|
+ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(K key, V value) throws IOException {
|
|
|
+ collector.collect(key, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close(TaskAttemptContext context) throws IOException {
|
|
|
+ collector.flush();
|
|
|
+ collector.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private <INKEY,INVALUE,OUTKEY,OUTVALUE>
|
|
|
+ void runNewMapper(final JobConf job,
|
|
|
+ final BytesWritable rawSplit,
|
|
|
+ final TaskUmbilicalProtocol umbilical,
|
|
|
+ TaskReporter reporter
|
|
|
+ ) throws IOException, ClassNotFoundException,
|
|
|
+ InterruptedException {
|
|
|
+ // make a task context so we can get the classes
|
|
|
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
|
|
|
+ new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
|
|
|
+ // make a mapper
|
|
|
+ org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
|
|
|
+ (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
|
|
|
+ ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
|
|
|
+ // make the input format
|
|
|
+ org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
|
|
|
+ (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
|
|
|
+ ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
|
|
|
+ // rebuild the input split
|
|
|
+ org.apache.hadoop.mapreduce.InputSplit split = null;
|
|
|
+ DataInputBuffer splitBuffer = new DataInputBuffer();
|
|
|
+ splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
|
|
|
+ SerializationFactory factory = new SerializationFactory(job);
|
|
|
+ Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
|
|
|
+ deserializer =
|
|
|
+ (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>)
|
|
|
+ factory.getDeserializer(job.getClassByName(splitClass));
|
|
|
+ deserializer.open(splitBuffer);
|
|
|
+ split = deserializer.deserialize(null);
|
|
|
+
|
|
|
+ org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
|
|
|
+ new NewTrackingRecordReader<INKEY,INVALUE>
|
|
|
+ (inputFormat.createRecordReader(split, taskContext), reporter);
|
|
|
+
|
|
|
+ job.setBoolean("mapred.skip.on", isSkipping());
|
|
|
+ org.apache.hadoop.mapreduce.RecordWriter output = null;
|
|
|
+ org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
|
|
|
+ mapperContext = null;
|
|
|
+ try {
|
|
|
+ Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
|
|
|
+ org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
|
|
|
+ (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
|
|
|
+ Configuration.class,
|
|
|
+ org.apache.hadoop.mapreduce.TaskAttemptID.class,
|
|
|
+ org.apache.hadoop.mapreduce.RecordReader.class,
|
|
|
+ org.apache.hadoop.mapreduce.RecordWriter.class,
|
|
|
+ org.apache.hadoop.mapreduce.OutputCommitter.class,
|
|
|
+ org.apache.hadoop.mapreduce.StatusReporter.class,
|
|
|
+ org.apache.hadoop.mapreduce.InputSplit.class});
|
|
|
+
|
|
|
+ // get an output object
|
|
|
+ if (job.getNumReduceTasks() == 0) {
|
|
|
+ output = outputFormat.getRecordWriter(taskContext);
|
|
|
+ } else {
|
|
|
+ output = new NewOutputCollector(job, umbilical, reporter);
|
|
|
+ }
|
|
|
+
|
|
|
+ mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
|
|
|
+ input, output, committer,
|
|
|
+ reporter, split);
|
|
|
+
|
|
|
+ input.initialize(split, mapperContext);
|
|
|
+ mapper.run(mapperContext);
|
|
|
+ input.close();
|
|
|
+ output.close(mapperContext);
|
|
|
+ } catch (NoSuchMethodException e) {
|
|
|
+ throw new IOException("Can't find Context constructor", e);
|
|
|
+ } catch (InstantiationException e) {
|
|
|
+ throw new IOException("Can't create Context", e);
|
|
|
+ } catch (InvocationTargetException e) {
|
|
|
+ throw new IOException("Can't invoke Context constructor", e);
|
|
|
+ } catch (IllegalAccessException e) {
|
|
|
+ throw new IOException("Can't invoke Context constructor", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
interface MapOutputCollector<K, V>
|
|
@@ -353,21 +526,20 @@ class MapTask extends Task {
|
|
|
|
|
|
private RecordWriter<K, V> out = null;
|
|
|
|
|
|
- private Reporter reporter = null;
|
|
|
+ private TaskReporter reporter = null;
|
|
|
|
|
|
private final Counters.Counter mapOutputRecordCounter;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
|
|
|
- JobConf job, Reporter reporter) throws IOException {
|
|
|
+ JobConf job, TaskReporter reporter) throws IOException {
|
|
|
this.reporter = reporter;
|
|
|
String finalName = getOutputName(getPartition());
|
|
|
FileSystem fs = FileSystem.get(job);
|
|
|
|
|
|
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
|
|
|
|
|
|
- Counters counters = getCounters();
|
|
|
- mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
|
|
|
+ mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
|
|
|
}
|
|
|
|
|
|
public void close() throws IOException {
|
|
@@ -393,7 +565,7 @@ class MapTask extends Task {
|
|
|
private final int partitions;
|
|
|
private final Partitioner<K, V> partitioner;
|
|
|
private final JobConf job;
|
|
|
- private final Reporter reporter;
|
|
|
+ private final TaskReporter reporter;
|
|
|
private final Class<K> keyClass;
|
|
|
private final Class<V> valClass;
|
|
|
private final RawComparator<K> comparator;
|
|
@@ -454,7 +626,7 @@ class MapTask extends Task {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
|
|
|
- Reporter reporter) throws IOException {
|
|
|
+ TaskReporter reporter) throws IOException {
|
|
|
this.job = job;
|
|
|
this.reporter = reporter;
|
|
|
localFs = FileSystem.getLocal(job);
|
|
@@ -504,11 +676,10 @@ class MapTask extends Task {
|
|
|
valSerializer = serializationFactory.getSerializer(valClass);
|
|
|
valSerializer.open(bb);
|
|
|
// counters
|
|
|
- Counters counters = getCounters();
|
|
|
- mapOutputByteCounter = counters.findCounter(MAP_OUTPUT_BYTES);
|
|
|
- mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS);
|
|
|
- combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS);
|
|
|
- combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS);
|
|
|
+ mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
|
|
|
+ mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
|
|
|
+ combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS);
|
|
|
+ combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
|
|
|
// compression
|
|
|
if (job.getCompressMapOutput()) {
|
|
|
Class<? extends CompressionCodec> codecClass =
|