|
@@ -18,34 +18,41 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import java.io.*;
|
|
|
-import java.util.*;
|
|
|
-
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
-import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
-import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
-import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
-import org.apache.hadoop.io.SequenceFile.Sorter;
|
|
|
-import org.apache.hadoop.io.SequenceFile.Sorter.*;
|
|
|
+import java.io.DataInput;
|
|
|
+import java.io.DataOutput;
|
|
|
+import java.io.DataOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
-
|
|
|
-import org.apache.commons.logging.*;
|
|
|
-import org.apache.hadoop.metrics.MetricsUtil;
|
|
|
+import org.apache.hadoop.io.BytesWritable;
|
|
|
+import org.apache.hadoop.io.DataInputBuffer;
|
|
|
+import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
+import org.apache.hadoop.io.WritableComparable;
|
|
|
+import org.apache.hadoop.io.WritableComparator;
|
|
|
+import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
+import org.apache.hadoop.io.SequenceFile.Sorter;
|
|
|
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
|
|
|
+import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
|
|
|
+import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
+import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
+import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
-import org.apache.hadoop.fs.*;
|
|
|
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
|
|
|
-import org.apache.hadoop.metrics.MetricsContext;
|
|
|
-import org.apache.hadoop.metrics.Updater;
|
|
|
|
|
|
|
|
|
/** A Map task. */
|
|
|
class MapTask extends Task {
|
|
|
-
|
|
|
- private MapTaskMetrics myMetrics = null;
|
|
|
|
|
|
private BytesWritable split = new BytesWritable();
|
|
|
private String splitClass;
|
|
@@ -58,28 +65,11 @@ class MapTask extends Task {
|
|
|
setPhase(TaskStatus.Phase.MAP);
|
|
|
}
|
|
|
|
|
|
- private class MapTaskMetrics {
|
|
|
- private MetricsRecord mapInputMetrics = null;
|
|
|
- private MetricsRecord mapOutputMetrics = null;
|
|
|
-
|
|
|
- MapTaskMetrics(String user) {
|
|
|
- MetricsContext context = MetricsUtil.getContext("mapred");
|
|
|
- mapInputMetrics = MetricsUtil.createRecord(context, "mapInput", "user", user);
|
|
|
- mapOutputMetrics = MetricsUtil.createRecord(context, "mapOutput", "user", user);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void mapInput(int numBytes) {
|
|
|
- mapInputMetrics.incrMetric("map_input_records", 1);
|
|
|
- mapInputMetrics.incrMetric("map_input_bytes", numBytes);
|
|
|
- mapInputMetrics.update();
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void mapOutput(int numBytes) {
|
|
|
- mapOutputMetrics.incrMetric("map_output_records", 1);
|
|
|
- mapOutputMetrics.incrMetric("map_output_bytes", numBytes);
|
|
|
- mapOutputMetrics.update();
|
|
|
- }
|
|
|
-
|
|
|
+ private enum Counter {
|
|
|
+ INPUT_RECORDS,
|
|
|
+ INPUT_BYTES,
|
|
|
+ OUTPUT_RECORDS,
|
|
|
+ OUTPUT_BYTES
|
|
|
}
|
|
|
|
|
|
public MapTask() {}
|
|
@@ -121,16 +111,12 @@ class MapTask extends Task {
|
|
|
super.readFields(in);
|
|
|
splitClass = Text.readString(in);
|
|
|
split.readFields(in);
|
|
|
- if (myMetrics == null) {
|
|
|
- myMetrics = new MapTaskMetrics("unknown");
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
|
|
|
throws IOException {
|
|
|
|
|
|
- myMetrics = new MapTaskMetrics(job.getUser());
|
|
|
- Reporter reporter = getReporter(umbilical, getProgress());
|
|
|
+ final Reporter reporter = getReporter(umbilical);
|
|
|
|
|
|
MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
|
|
|
|
|
@@ -175,7 +161,8 @@ class MapTask extends Task {
|
|
|
setProgress(getProgress());
|
|
|
long beforePos = getPos();
|
|
|
boolean ret = rawIn.next(key, value);
|
|
|
- myMetrics.mapInput((int)(getPos() - beforePos));
|
|
|
+ reporter.incrCounter(Counter.INPUT_RECORDS, 1);
|
|
|
+ reporter.incrCounter(Counter.INPUT_BYTES, (getPos() - beforePos));
|
|
|
return ret;
|
|
|
}
|
|
|
public long getPos() throws IOException { return rawIn.getPos(); }
|
|
@@ -337,7 +324,9 @@ class MapTask extends Task {
|
|
|
int partNumber = partitioner.getPartition(key, value, partitions);
|
|
|
sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
|
|
|
|
|
|
- myMetrics.mapOutput((int)(keyValBuffer.getLength() - keyOffset));
|
|
|
+ reporter.incrCounter(Counter.OUTPUT_RECORDS, 1);
|
|
|
+ reporter.incrCounter(Counter.OUTPUT_BYTES,
|
|
|
+ (keyValBuffer.getLength() - keyOffset));
|
|
|
|
|
|
//now check whether we need to spill to disk
|
|
|
long totalMem = 0;
|