|
@@ -1588,6 +1588,7 @@ public class MapTask extends Task {
|
|
|
final long size = distanceTo(bufstart, bufend, bufvoid) +
|
|
|
partitions * APPROX_HEADER_LENGTH;
|
|
|
FSDataOutputStream out = null;
|
|
|
+ FSDataOutputStream partitionOut = null;
|
|
|
try {
|
|
|
// create spill file
|
|
|
final SpillRecord spillRec = new SpillRecord(partitions);
|
|
@@ -1608,7 +1609,7 @@ public class MapTask extends Task {
|
|
|
IFile.Writer<K, V> writer = null;
|
|
|
try {
|
|
|
long segmentStart = out.getPos();
|
|
|
- FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
|
|
|
+ partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
|
|
|
writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
|
|
|
spilledRecordsCounter);
|
|
|
if (combinerRunner == null) {
|
|
@@ -1643,6 +1644,10 @@ public class MapTask extends Task {
|
|
|
|
|
|
// close the writer
|
|
|
writer.close();
|
|
|
+ if (partitionOut != out) {
|
|
|
+ partitionOut.close();
|
|
|
+ partitionOut = null;
|
|
|
+ }
|
|
|
|
|
|
// record offsets
|
|
|
rec.startOffset = segmentStart;
|
|
@@ -1671,6 +1676,9 @@ public class MapTask extends Task {
|
|
|
++numSpills;
|
|
|
} finally {
|
|
|
if (out != null) out.close();
|
|
|
+ if (partitionOut != null) {
|
|
|
+ partitionOut.close();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1683,6 +1691,7 @@ public class MapTask extends Task {
|
|
|
int partition) throws IOException {
|
|
|
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
|
|
|
FSDataOutputStream out = null;
|
|
|
+ FSDataOutputStream partitionOut = null;
|
|
|
try {
|
|
|
// create spill file
|
|
|
final SpillRecord spillRec = new SpillRecord(partitions);
|
|
@@ -1697,7 +1706,7 @@ public class MapTask extends Task {
|
|
|
try {
|
|
|
long segmentStart = out.getPos();
|
|
|
// Create a new codec, don't care!
|
|
|
- FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
|
|
|
+ partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
|
|
|
writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
|
|
|
spilledRecordsCounter);
|
|
|
|
|
@@ -1709,6 +1718,10 @@ public class MapTask extends Task {
|
|
|
mapOutputByteCounter.increment(out.getPos() - recordStart);
|
|
|
}
|
|
|
writer.close();
|
|
|
+ if (partitionOut != out) {
|
|
|
+ partitionOut.close();
|
|
|
+ partitionOut = null;
|
|
|
+ }
|
|
|
|
|
|
// record offsets
|
|
|
rec.startOffset = segmentStart;
|
|
@@ -1736,6 +1749,9 @@ public class MapTask extends Task {
|
|
|
++numSpills;
|
|
|
} finally {
|
|
|
if (out != null) out.close();
|
|
|
+ if (partitionOut != null) {
|
|
|
+ partitionOut.close();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1847,6 +1863,7 @@ public class MapTask extends Task {
|
|
|
|
|
|
//The output stream for the final single output file
|
|
|
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
|
|
|
+ FSDataOutputStream finalPartitionOut = null;
|
|
|
|
|
|
if (numSpills == 0) {
|
|
|
//create dummy files
|
|
@@ -1855,10 +1872,15 @@ public class MapTask extends Task {
|
|
|
try {
|
|
|
for (int i = 0; i < partitions; i++) {
|
|
|
long segmentStart = finalOut.getPos();
|
|
|
- FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
|
|
|
+ finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut,
|
|
|
+ false);
|
|
|
Writer<K, V> writer =
|
|
|
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
|
|
|
writer.close();
|
|
|
+ if (finalPartitionOut != finalOut) {
|
|
|
+ finalPartitionOut.close();
|
|
|
+ finalPartitionOut = null;
|
|
|
+ }
|
|
|
rec.startOffset = segmentStart;
|
|
|
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
|
|
|
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
|
|
@@ -1867,6 +1889,9 @@ public class MapTask extends Task {
|
|
|
sr.writeToFile(finalIndexFile, job);
|
|
|
} finally {
|
|
|
finalOut.close();
|
|
|
+ if (finalPartitionOut != null) {
|
|
|
+ finalPartitionOut.close();
|
|
|
+ }
|
|
|
}
|
|
|
sortPhase.complete();
|
|
|
return;
|
|
@@ -1910,7 +1935,7 @@ public class MapTask extends Task {
|
|
|
|
|
|
//write merged output to disk
|
|
|
long segmentStart = finalOut.getPos();
|
|
|
- FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
|
|
|
+ finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false);
|
|
|
Writer<K, V> writer =
|
|
|
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
|
|
|
spilledRecordsCounter);
|
|
@@ -1923,6 +1948,10 @@ public class MapTask extends Task {
|
|
|
|
|
|
//close
|
|
|
writer.close();
|
|
|
+ if (finalPartitionOut != finalOut) {
|
|
|
+ finalPartitionOut.close();
|
|
|
+ finalPartitionOut = null;
|
|
|
+ }
|
|
|
|
|
|
sortPhase.startNextPhase();
|
|
|
|
|
@@ -1934,6 +1963,9 @@ public class MapTask extends Task {
|
|
|
}
|
|
|
spillRec.writeToFile(finalIndexFile, job);
|
|
|
finalOut.close();
|
|
|
+ if (finalPartitionOut != null) {
|
|
|
+ finalPartitionOut.close();
|
|
|
+ }
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
rfs.delete(filename[i],true);
|
|
|
}
|