|
@@ -467,26 +467,24 @@ class MapTask extends Task {
|
|
{
|
|
{
|
|
Path [] filename = new Path[numSpills];
|
|
Path [] filename = new Path[numSpills];
|
|
Path [] indexFileName = new Path[numSpills];
|
|
Path [] indexFileName = new Path[numSpills];
|
|
- FSDataInputStream in[] = new FSDataInputStream[numSpills];
|
|
|
|
- FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
|
|
|
|
|
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
for(int i = 0; i < numSpills; i++) {
|
|
filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
|
|
filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
|
|
- in[i] = localFs.open(filename[i]);
|
|
|
|
indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
|
|
indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
|
|
- indexIn[i] = localFs.open(indexFileName[i]);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
//create a sorter object as we need access to the SegmentDescriptor
|
|
//create a sorter object as we need access to the SegmentDescriptor
|
|
//class and merge methods
|
|
//class and merge methods
|
|
Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
|
|
Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
|
|
- sorter.setFactor(numSpills);
|
|
|
|
|
|
|
|
for (int parts = 0; parts < partitions; parts++){
|
|
for (int parts = 0; parts < partitions; parts++){
|
|
List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
|
|
List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
|
|
for(int i = 0; i < numSpills; i++) {
|
|
for(int i = 0; i < numSpills; i++) {
|
|
- long segmentOffset = indexIn[i].readLong();
|
|
|
|
- long segmentLength = indexIn[i].readLong();
|
|
|
|
|
|
+ FSDataInputStream indexIn = localFs.open(indexFileName[i]);
|
|
|
|
+ indexIn.seek(parts * 16);
|
|
|
|
+ long segmentOffset = indexIn.readLong();
|
|
|
|
+ long segmentLength = indexIn.readLong();
|
|
|
|
+ indexIn.close();
|
|
SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
|
|
SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
|
|
segmentLength, filename[i]);
|
|
segmentLength, filename[i]);
|
|
s.preserveInput(true);
|
|
s.preserveInput(true);
|
|
@@ -513,8 +511,8 @@ class MapTask extends Task {
|
|
finalIndexOut.close();
|
|
finalIndexOut.close();
|
|
//cleanup
|
|
//cleanup
|
|
for(int i = 0; i < numSpills; i++) {
|
|
for(int i = 0; i < numSpills; i++) {
|
|
- in[i].close(); localFs.delete(filename[i]);
|
|
|
|
- indexIn[i].close(); localFs.delete(indexFileName[i]);
|
|
|
|
|
|
+ localFs.delete(filename[i]);
|
|
|
|
+ localFs.delete(indexFileName[i]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|