|
@@ -49,6 +49,7 @@ import org.apache.hadoop.util.ToolRunner;
|
|
*/
|
|
*/
|
|
public class TeraSort extends Configured implements Tool {
|
|
public class TeraSort extends Configured implements Tool {
|
|
private static final Log LOG = LogFactory.getLog(TeraSort.class);
|
|
private static final Log LOG = LogFactory.getLog(TeraSort.class);
|
|
|
|
+ private static final String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
|
|
|
|
|
|
/**
|
|
/**
|
|
* A partitioner that splits text keys into roughly equal partitions
|
|
* A partitioner that splits text keys into roughly equal partitions
|
|
@@ -222,6 +223,10 @@ public class TeraSort extends Configured implements Tool {
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public static int getOutputReplication(JobConf job) {
|
|
|
|
+ return job.getInt(OUTPUT_REPLICATION, 1);
|
|
|
|
+ }
|
|
|
|
|
|
public int run(String[] args) throws Exception {
|
|
public int run(String[] args) throws Exception {
|
|
LOG.info("starting");
|
|
LOG.info("starting");
|
|
@@ -243,7 +248,7 @@ public class TeraSort extends Configured implements Tool {
|
|
TeraInputFormat.writePartitionFile(job, partitionFile);
|
|
TeraInputFormat.writePartitionFile(job, partitionFile);
|
|
DistributedCache.addCacheFile(partitionUri, job);
|
|
DistributedCache.addCacheFile(partitionUri, job);
|
|
DistributedCache.createSymlink(job);
|
|
DistributedCache.createSymlink(job);
|
|
- job.setInt("dfs.replication", 1);
|
|
|
|
|
|
+ job.setInt("dfs.replication", getOutputReplication(job));
|
|
TeraOutputFormat.setFinalSync(job, true);
|
|
TeraOutputFormat.setFinalSync(job, true);
|
|
JobClient.runJob(job);
|
|
JobClient.runJob(job);
|
|
LOG.info("done");
|
|
LOG.info("done");
|