|
@@ -49,7 +49,6 @@ public class SequenceFile {
|
|
|
public static class Writer {
|
|
|
private FSDataOutputStream out;
|
|
|
private DataOutputBuffer buffer = new DataOutputBuffer();
|
|
|
- private FileSystem fs = null;
|
|
|
private Path target = null;
|
|
|
|
|
|
private Class keyClass;
|
|
@@ -95,7 +94,6 @@ public class SequenceFile {
|
|
|
public Writer(FileSystem fs, Path name,
|
|
|
Class keyClass, Class valClass, boolean compress)
|
|
|
throws IOException {
|
|
|
- this.fs = fs;
|
|
|
this.target = name;
|
|
|
init(fs.create(target), keyClass, valClass, compress);
|
|
|
}
|
|
@@ -205,7 +203,6 @@ public class SequenceFile {
|
|
|
private FSDataInputStream in;
|
|
|
private DataOutputBuffer outBuf = new DataOutputBuffer();
|
|
|
private DataInputBuffer inBuf = new DataInputBuffer();
|
|
|
- private FileSystem fs = null;
|
|
|
|
|
|
private byte[] version = new byte[VERSION.length];
|
|
|
|
|
@@ -239,7 +236,6 @@ public class SequenceFile {
|
|
|
|
|
|
private Reader(FileSystem fs, Path name, int bufferSize,
|
|
|
Configuration conf) throws IOException {
|
|
|
- this.fs = fs;
|
|
|
this.file = name;
|
|
|
this.in = fs.open(file, bufferSize);
|
|
|
this.end = fs.getLength(file);
|
|
@@ -249,7 +245,6 @@ public class SequenceFile {
|
|
|
|
|
|
private Reader(FileSystem fs, Path file, int bufferSize, long start,
|
|
|
long length, Configuration conf) throws IOException {
|
|
|
- this.fs = fs;
|
|
|
this.file = file;
|
|
|
this.in = fs.open(file, bufferSize);
|
|
|
this.conf = conf;
|
|
@@ -465,8 +460,7 @@ public class SequenceFile {
|
|
|
|
|
|
private WritableComparator comparator;
|
|
|
|
|
|
- private Path inFile; // when sorting
|
|
|
- private Path[] inFiles; // when merging
|
|
|
+ private Path[] inFiles; // when merging or sorting
|
|
|
|
|
|
private Path outFile;
|
|
|
|
|
@@ -508,16 +502,22 @@ public class SequenceFile {
|
|
|
/** Get the total amount of buffer memory, in bytes.*/
|
|
|
public int getMemory() { return memory; }
|
|
|
|
|
|
- /** Perform a file sort.*/
|
|
|
- public void sort(Path inFile, Path outFile) throws IOException {
|
|
|
+ /**
|
|
|
+ * Perform a file sort from a set of input files into an output file.
|
|
|
+ * @param inFiles the files to be sorted
|
|
|
+ * @param outFile the sorted output file
|
|
|
+ * @param deleteInput should the input files be deleted as they are read?
|
|
|
+ */
|
|
|
+ public void sort(Path[] inFiles, Path outFile,
|
|
|
+ boolean deleteInput) throws IOException {
|
|
|
if (fs.exists(outFile)) {
|
|
|
throw new IOException("already exists: " + outFile);
|
|
|
}
|
|
|
|
|
|
- this.inFile = inFile;
|
|
|
+ this.inFiles = inFiles;
|
|
|
this.outFile = outFile;
|
|
|
|
|
|
- int segments = sortPass();
|
|
|
+ int segments = sortPass(deleteInput);
|
|
|
int pass = 1;
|
|
|
while (segments > 1) {
|
|
|
segments = mergePass(pass, segments <= factor);
|
|
@@ -525,11 +525,20 @@ public class SequenceFile {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private int sortPass() throws IOException {
|
|
|
+ /**
|
|
|
+ * The backwards compatible interface to sort.
|
|
|
+ * @param inFile the input file to sort
|
|
|
+ * @param outFile the sorted output file
|
|
|
+ */
|
|
|
+ public void sort(Path inFile, Path outFile) throws IOException {
|
|
|
+ sort(new Path[]{inFile}, outFile, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int sortPass(boolean deleteInput) throws IOException {
|
|
|
LOG.debug("running sort pass");
|
|
|
- SortPass sortPass = new SortPass(this.conf); // make the SortPass
|
|
|
+ SortPass sortPass = new SortPass(); // make the SortPass
|
|
|
try {
|
|
|
- return sortPass.run(); // run it
|
|
|
+ return sortPass.run(deleteInput); // run it
|
|
|
} finally {
|
|
|
sortPass.close(); // close it
|
|
|
}
|
|
@@ -550,13 +559,15 @@ public class SequenceFile {
|
|
|
private FSDataOutputStream out;
|
|
|
private Path outName;
|
|
|
|
|
|
- public SortPass(Configuration conf) throws IOException {
|
|
|
- in = new Reader(fs, inFile, conf);
|
|
|
- }
|
|
|
-
|
|
|
- public int run() throws IOException {
|
|
|
+ public int run(boolean deleteInput) throws IOException {
|
|
|
int segments = 0;
|
|
|
- boolean atEof = false;
|
|
|
+ int currentFile = 0;
|
|
|
+ boolean atEof = currentFile >= inFiles.length;
|
|
|
+ boolean isCompressed = false;
|
|
|
+ if (!atEof) {
|
|
|
+ in = new Reader(fs, inFiles[currentFile], conf);
|
|
|
+ isCompressed = in.isCompressed();
|
|
|
+ }
|
|
|
while (!atEof) {
|
|
|
int count = 0;
|
|
|
buffer.reset();
|
|
@@ -564,12 +575,21 @@ public class SequenceFile {
|
|
|
|
|
|
int start = buffer.getLength(); // read an entry into buffer
|
|
|
int keyLength = in.next(buffer);
|
|
|
- int length = buffer.getLength() - start;
|
|
|
-
|
|
|
if (keyLength == -1) {
|
|
|
- atEof = true;
|
|
|
- break;
|
|
|
+ in.close();
|
|
|
+ if (deleteInput) {
|
|
|
+ fs.delete(inFiles[currentFile]);
|
|
|
+ }
|
|
|
+ currentFile += 1;
|
|
|
+ atEof = currentFile >= inFiles.length;
|
|
|
+ if (!atEof) {
|
|
|
+ in = new Reader(fs, inFiles[currentFile], conf);
|
|
|
+ } else {
|
|
|
+ in = null;
|
|
|
+ }
|
|
|
+ continue;
|
|
|
}
|
|
|
+ int length = buffer.getLength() - start;
|
|
|
|
|
|
if (count == starts.length)
|
|
|
grow();
|
|
@@ -586,15 +606,16 @@ public class SequenceFile {
|
|
|
LOG.info("flushing segment " + segments);
|
|
|
rawBuffer = buffer.getData();
|
|
|
sort(count);
|
|
|
- flush(count, segments==0 && atEof);
|
|
|
+ flush(count, isCompressed, segments==0 && atEof);
|
|
|
segments++;
|
|
|
}
|
|
|
return segments;
|
|
|
}
|
|
|
|
|
|
public void close() throws IOException {
|
|
|
- in.close();
|
|
|
-
|
|
|
+ if (in != null) {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
if (out != null) {
|
|
|
out.close();
|
|
|
}
|
|
@@ -615,7 +636,8 @@ public class SequenceFile {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- private void flush(int count, boolean done) throws IOException {
|
|
|
+ private void flush(int count, boolean isCompressed,
|
|
|
+ boolean done) throws IOException {
|
|
|
if (out == null) {
|
|
|
outName = done ? outFile : outFile.suffix(".0");
|
|
|
out = fs.create(outName);
|
|
@@ -630,7 +652,7 @@ public class SequenceFile {
|
|
|
out.writeLong(count); // write count
|
|
|
}
|
|
|
|
|
|
- Writer writer = new Writer(out, keyClass, valClass, in.isCompressed());
|
|
|
+ Writer writer = new Writer(out, keyClass, valClass, isCompressed);
|
|
|
if (!done) {
|
|
|
writer.sync = null; // disable sync on temp files
|
|
|
}
|
|
@@ -701,7 +723,6 @@ public class SequenceFile {
|
|
|
}
|
|
|
|
|
|
private class MergePass {
|
|
|
- private int pass;
|
|
|
private boolean last;
|
|
|
|
|
|
private MergeQueue queue;
|
|
@@ -709,7 +730,6 @@ public class SequenceFile {
|
|
|
private Path inName;
|
|
|
|
|
|
public MergePass(int pass, boolean last) throws IOException {
|
|
|
- this.pass = pass;
|
|
|
this.last = last;
|
|
|
|
|
|
this.queue =
|