|
@@ -43,8 +43,6 @@ import org.apache.hadoop.util.Progressable;
|
|
|
class Merger {
|
|
|
private static final Log LOG = LogFactory.getLog(Merger.class);
|
|
|
|
|
|
- private static final long PROGRESS_BAR = 10000;
|
|
|
-
|
|
|
// Local directories
|
|
|
private static LocalDirAllocator lDirAlloc =
|
|
|
new LocalDirAllocator("mapred.local.dir");
|
|
@@ -78,13 +76,17 @@ class Merger {
|
|
|
|
|
|
public static <K extends Object, V extends Object>
|
|
|
void writeFile(RawKeyValueIterator records, Writer<K, V> writer,
|
|
|
- Progressable progressable)
|
|
|
+ Progressable progressable, Configuration conf)
|
|
|
throws IOException {
|
|
|
+
|
|
|
long recordCtr = 0;
|
|
|
+ long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress",
|
|
|
+ 10000);
|
|
|
+
|
|
|
while(records.next()) {
|
|
|
writer.append(records.getKey(), records.getValue());
|
|
|
|
|
|
- if ((++recordCtr % PROGRESS_BAR) == 0) {
|
|
|
+ if ((recordCtr++ % progressBar) == 0) {
|
|
|
progressable.progress();
|
|
|
}
|
|
|
}
|
|
@@ -372,7 +374,7 @@ class Merger {
|
|
|
|
|
|
Writer<K, V> writer =
|
|
|
new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
|
|
|
- writeFile(this, writer, reporter);
|
|
|
+ writeFile(this, writer, reporter, conf);
|
|
|
writer.close();
|
|
|
|
|
|
//we finished one single level merge; now clean up the priority
|