|
@@ -314,14 +314,16 @@ public class SortValidator extends Configured implements Tool {
|
|
|
|
|
|
static void checkRecords(Configuration defaults,
|
|
|
Path sortInput, Path sortOutput) throws IOException {
|
|
|
- FileSystem fs = FileSystem.get(defaults);
|
|
|
+ FileSystem inputfs = sortInput.getFileSystem(defaults);
|
|
|
+ FileSystem outputfs = sortOutput.getFileSystem(defaults);
|
|
|
+ FileSystem defaultfs = FileSystem.get(defaults);
|
|
|
JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
|
|
|
jobConf.setJobName("sortvalidate-recordstats-checker");
|
|
|
|
|
|
int noSortReduceTasks =
|
|
|
- fs.listStatus(sortOutput, sortPathsFilter).length;
|
|
|
+ outputfs.listStatus(sortOutput, sortPathsFilter).length;
|
|
|
jobConf.setInt("sortvalidate.sort.reduce.tasks", noSortReduceTasks);
|
|
|
- int noSortInputpaths = fs.listStatus(sortInput).length;
|
|
|
+ int noSortInputpaths = inputfs.listStatus(sortInput).length;
|
|
|
|
|
|
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
|
|
|
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
|
@@ -339,8 +341,8 @@ public class SortValidator extends Configured implements Tool {
|
|
|
FileInputFormat.setInputPaths(jobConf, sortInput);
|
|
|
FileInputFormat.addInputPath(jobConf, sortOutput);
|
|
|
Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
|
|
|
- if (fs.exists(outputPath)) {
|
|
|
- fs.delete(outputPath, true);
|
|
|
+ if (defaultfs.exists(outputPath)) {
|
|
|
+ defaultfs.delete(outputPath, true);
|
|
|
}
|
|
|
FileOutputFormat.setOutputPath(jobConf, outputPath);
|
|
|
|
|
@@ -365,7 +367,7 @@ public class SortValidator extends Configured implements Tool {
|
|
|
|
|
|
// Check to ensure that the statistics of the
|
|
|
// framework's sort-input and sort-output match
|
|
|
- SequenceFile.Reader stats = new SequenceFile.Reader(fs,
|
|
|
+ SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
|
|
|
new Path(outputPath, "part-00000"), defaults);
|
|
|
IntWritable k1 = new IntWritable();
|
|
|
IntWritable k2 = new IntWritable();
|