|
@@ -235,23 +235,25 @@ public class SequenceFile {
|
|
|
/** Open the named file. */
|
|
|
public Reader(FileSystem fs, Path file, Configuration conf)
|
|
|
throws IOException {
|
|
|
- this(fs, file, conf.getInt("io.file.buffer.size", 4096));
|
|
|
- this.conf = conf;
|
|
|
+ this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf);
|
|
|
}
|
|
|
|
|
|
- private Reader(FileSystem fs, Path name, int bufferSize) throws IOException {
|
|
|
+ private Reader(FileSystem fs, Path name, int bufferSize,
|
|
|
+ Configuration conf) throws IOException {
|
|
|
this.fs = fs;
|
|
|
this.file = name;
|
|
|
this.in = fs.open(file, bufferSize);
|
|
|
this.end = fs.getLength(file);
|
|
|
+ this.conf = conf;
|
|
|
init();
|
|
|
}
|
|
|
|
|
|
- private Reader(FileSystem fs, Path file, int bufferSize, long start, long length)
|
|
|
- throws IOException {
|
|
|
+ private Reader(FileSystem fs, Path file, int bufferSize, long start,
|
|
|
+ long length, Configuration conf) throws IOException {
|
|
|
this.fs = fs;
|
|
|
this.file = file;
|
|
|
this.in = fs.open(file, bufferSize);
|
|
|
+ this.conf = conf;
|
|
|
seek(start);
|
|
|
init();
|
|
|
|
|
@@ -742,7 +744,7 @@ public class SequenceFile {
|
|
|
totalCount+= count;
|
|
|
|
|
|
Reader reader = new Reader(fs, inName, memory/(factor+1),
|
|
|
- in.getPos(), length);
|
|
|
+ in.getPos(), length, conf);
|
|
|
reader.sync = null; // disable sync on temp files
|
|
|
|
|
|
MergeStream ms = new MergeStream(reader); // add segment to queue
|
|
@@ -801,7 +803,7 @@ public class SequenceFile {
|
|
|
for (int i = 0; i < inFiles.length; i++) {
|
|
|
Path inFile = inFiles[i];
|
|
|
MergeStream ms =
|
|
|
- new MergeStream(new Reader(fs, inFile, memory/(factor+1)));
|
|
|
+ new MergeStream(new Reader(fs, inFile, memory/(factor+1), conf));
|
|
|
if (ms.next())
|
|
|
queue.put(ms);
|
|
|
}
|