|
@@ -60,48 +60,6 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
private static MRJobConfig lastContext = null;
|
|
|
private static List<InputSplit> lastResult = null;
|
|
|
|
|
|
- static class TeraFileSplit extends FileSplit {
|
|
|
- static private String[] ZERO_LOCATIONS = new String[0];
|
|
|
-
|
|
|
- private String[] locations;
|
|
|
-
|
|
|
- public TeraFileSplit() {
|
|
|
- locations = ZERO_LOCATIONS;
|
|
|
- }
|
|
|
- public TeraFileSplit(Path file, long start, long length, String[] hosts) {
|
|
|
- super(file, start, length, hosts);
|
|
|
- try {
|
|
|
- locations = super.getLocations();
|
|
|
- } catch (IOException e) {
|
|
|
- locations = ZERO_LOCATIONS;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // XXXXXX should this also be null-protected?
|
|
|
- protected void setLocations(String[] hosts) {
|
|
|
- locations = hosts;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String[] getLocations() {
|
|
|
- return locations;
|
|
|
- }
|
|
|
-
|
|
|
- public String toString() {
|
|
|
- StringBuffer result = new StringBuffer();
|
|
|
- result.append(getPath());
|
|
|
- result.append(" from ");
|
|
|
- result.append(getStart());
|
|
|
- result.append(" length ");
|
|
|
- result.append(getLength());
|
|
|
- for(String host: getLocations()) {
|
|
|
- result.append(" ");
|
|
|
- result.append(host);
|
|
|
- }
|
|
|
- return result.toString();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
static class TextSampler implements IndexedSortable {
|
|
|
private ArrayList<Text> records = new ArrayList<Text>();
|
|
|
|
|
@@ -325,11 +283,6 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
return new TeraRecordReader();
|
|
|
}
|
|
|
|
|
|
- protected FileSplit makeSplit(Path file, long start, long length,
|
|
|
- String[] hosts) {
|
|
|
- return new TeraFileSplit(file, start, length, hosts);
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public List<InputSplit> getSplits(JobContext job) throws IOException {
|
|
|
if (job == lastContext) {
|
|
@@ -343,7 +296,7 @@ public class TeraInputFormat extends FileInputFormat<Text,Text> {
|
|
|
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
|
|
|
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
|
|
|
TeraScheduler scheduler = new TeraScheduler(
|
|
|
- lastResult.toArray(new TeraFileSplit[0]), job.getConfiguration());
|
|
|
+ lastResult.toArray(new FileSplit[0]), job.getConfiguration());
|
|
|
lastResult = scheduler.getNewFileSplits();
|
|
|
t3 = System.currentTimeMillis();
|
|
|
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
|