|
@@ -54,12 +54,15 @@ class Merger {
|
|
CompressionCodec codec,
|
|
CompressionCodec codec,
|
|
Path[] inputs, boolean deleteInputs,
|
|
Path[] inputs, boolean deleteInputs,
|
|
int mergeFactor, Path tmpDir,
|
|
int mergeFactor, Path tmpDir,
|
|
- RawComparator<K> comparator, Progressable reporter)
|
|
|
|
|
|
+ RawComparator<K> comparator, Progressable reporter,
|
|
|
|
+ Counters.Counter readsCounter,
|
|
|
|
+ Counters.Counter writesCounter)
|
|
throws IOException {
|
|
throws IOException {
|
|
return
|
|
return
|
|
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
|
|
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
|
|
reporter).merge(keyClass, valueClass,
|
|
reporter).merge(keyClass, valueClass,
|
|
- mergeFactor, tmpDir);
|
|
|
|
|
|
+ mergeFactor, tmpDir,
|
|
|
|
+ readsCounter, writesCounter);
|
|
}
|
|
}
|
|
|
|
|
|
public static <K extends Object, V extends Object>
|
|
public static <K extends Object, V extends Object>
|
|
@@ -67,10 +70,12 @@ class Merger {
|
|
Class<K> keyClass, Class<V> valueClass,
|
|
Class<K> keyClass, Class<V> valueClass,
|
|
List<Segment<K, V>> segments,
|
|
List<Segment<K, V>> segments,
|
|
int mergeFactor, Path tmpDir,
|
|
int mergeFactor, Path tmpDir,
|
|
- RawComparator<K> comparator, Progressable reporter)
|
|
|
|
|
|
+ RawComparator<K> comparator, Progressable reporter,
|
|
|
|
+ Counters.Counter readsCounter,
|
|
|
|
+ Counters.Counter writesCounter)
|
|
throws IOException {
|
|
throws IOException {
|
|
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
|
|
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
|
|
- comparator, reporter, false);
|
|
|
|
|
|
+ comparator, reporter, false, readsCounter, writesCounter);
|
|
}
|
|
}
|
|
|
|
|
|
public static <K extends Object, V extends Object>
|
|
public static <K extends Object, V extends Object>
|
|
@@ -79,11 +84,14 @@ class Merger {
|
|
List<Segment<K, V>> segments,
|
|
List<Segment<K, V>> segments,
|
|
int mergeFactor, Path tmpDir,
|
|
int mergeFactor, Path tmpDir,
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
- boolean sortSegments)
|
|
|
|
|
|
+ boolean sortSegments,
|
|
|
|
+ Counters.Counter readsCounter,
|
|
|
|
+ Counters.Counter writesCounter)
|
|
throws IOException {
|
|
throws IOException {
|
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
|
sortSegments).merge(keyClass, valueClass,
|
|
sortSegments).merge(keyClass, valueClass,
|
|
- mergeFactor, tmpDir);
|
|
|
|
|
|
+ mergeFactor, tmpDir,
|
|
|
|
+ readsCounter, writesCounter);
|
|
}
|
|
}
|
|
|
|
|
|
static <K extends Object, V extends Object>
|
|
static <K extends Object, V extends Object>
|
|
@@ -92,12 +100,15 @@ class Merger {
|
|
List<Segment<K, V>> segments,
|
|
List<Segment<K, V>> segments,
|
|
int mergeFactor, int inMemSegments, Path tmpDir,
|
|
int mergeFactor, int inMemSegments, Path tmpDir,
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
RawComparator<K> comparator, Progressable reporter,
|
|
- boolean sortSegments)
|
|
|
|
|
|
+ boolean sortSegments,
|
|
|
|
+ Counters.Counter readsCounter,
|
|
|
|
+ Counters.Counter writesCounter)
|
|
throws IOException {
|
|
throws IOException {
|
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
|
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
|
|
sortSegments).merge(keyClass, valueClass,
|
|
sortSegments).merge(keyClass, valueClass,
|
|
mergeFactor, inMemSegments,
|
|
mergeFactor, inMemSegments,
|
|
- tmpDir);
|
|
|
|
|
|
+ tmpDir,
|
|
|
|
+ readsCounter, writesCounter);
|
|
}
|
|
}
|
|
|
|
|
|
public static <K extends Object, V extends Object>
|
|
public static <K extends Object, V extends Object>
|
|
@@ -144,9 +155,9 @@ class Merger {
|
|
this.segmentLength = reader.getLength();
|
|
this.segmentLength = reader.getLength();
|
|
}
|
|
}
|
|
|
|
|
|
- private void init() throws IOException {
|
|
|
|
|
|
+ private void init(Counters.Counter readsCounter) throws IOException {
|
|
if (reader == null) {
|
|
if (reader == null) {
|
|
- reader = new Reader<K, V>(conf, fs, file, codec);
|
|
|
|
|
|
+ reader = new Reader<K, V>(conf, fs, file, codec, readsCounter);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -309,13 +320,18 @@ class Merger {
|
|
}
|
|
}
|
|
|
|
|
|
public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
|
|
public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
|
|
- int factor, Path tmpDir)
|
|
|
|
|
|
+ int factor, Path tmpDir,
|
|
|
|
+ Counters.Counter readsCounter,
|
|
|
|
+ Counters.Counter writesCounter)
|
|
throws IOException {
|
|
throws IOException {
|
|
- return merge(keyClass, valueClass, factor, 0, tmpDir);
|
|
|
|
|
|
+ return merge(keyClass, valueClass, factor, 0, tmpDir,
|
|
|
|
+ readsCounter, writesCounter);
|
|
}
|
|
}
|
|
|
|
|
|
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
|
|
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
|
|
- int factor, int inMem, Path tmpDir)
|
|
|
|
|
|
+ int factor, int inMem, Path tmpDir,
|
|
|
|
+ Counters.Counter readsCounter,
|
|
|
|
+ Counters.Counter writesCounter)
|
|
throws IOException {
|
|
throws IOException {
|
|
LOG.info("Merging " + segments.size() + " sorted segments");
|
|
LOG.info("Merging " + segments.size() + " sorted segments");
|
|
|
|
|
|
@@ -344,7 +360,7 @@ class Merger {
|
|
for (Segment<K, V> segment : mStream) {
|
|
for (Segment<K, V> segment : mStream) {
|
|
// Initialize the segment at the last possible moment;
|
|
// Initialize the segment at the last possible moment;
|
|
// this helps in ensuring we don't use buffers until we need them
|
|
// this helps in ensuring we don't use buffers until we need them
|
|
- segment.init();
|
|
|
|
|
|
+ segment.init(readsCounter);
|
|
long startPos = segment.getPosition();
|
|
long startPos = segment.getPosition();
|
|
boolean hasNext = segment.next();
|
|
boolean hasNext = segment.next();
|
|
long endPos = segment.getPosition();
|
|
long endPos = segment.getPosition();
|
|
@@ -417,7 +433,8 @@ class Merger {
|
|
approxOutputSize, conf);
|
|
approxOutputSize, conf);
|
|
|
|
|
|
Writer<K, V> writer =
|
|
Writer<K, V> writer =
|
|
- new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
|
|
|
|
|
|
+ new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
|
|
|
|
+ writesCounter);
|
|
writeFile(this, writer, reporter);
|
|
writeFile(this, writer, reporter);
|
|
writer.close();
|
|
writer.close();
|
|
|
|
|