|
@@ -108,16 +108,13 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
|
|
|
String name,
|
|
|
Progressable progress)
|
|
|
throws IOException {
|
|
|
-
|
|
|
- String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
|
|
|
- Path dir = getWorkOutputPath(job);
|
|
|
- FileSystem fs = dir.getFileSystem(job);
|
|
|
- if (!fs.exists(dir)) {
|
|
|
- throw new IOException("Output directory doesnt exist");
|
|
|
- }
|
|
|
boolean isCompressed = getCompressOutput(job);
|
|
|
+ String keyValueSeparator = job.get("mapred.textoutputformat.separator",
|
|
|
+ "\t");
|
|
|
if (!isCompressed) {
|
|
|
- FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
|
|
|
+ Path file = FileOutputFormat.getTaskOutputPath(job, name);
|
|
|
+ FileSystem fs = file.getFileSystem(job);
|
|
|
+ FSDataOutputStream fileOut = fs.create(file, progress);
|
|
|
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
|
|
|
} else {
|
|
|
Class<? extends CompressionCodec> codecClass =
|
|
@@ -126,8 +123,11 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
|
|
|
CompressionCodec codec = (CompressionCodec)
|
|
|
ReflectionUtils.newInstance(codecClass, job);
|
|
|
// build the filename including the extension
|
|
|
- Path filename = new Path(dir, name + codec.getDefaultExtension());
|
|
|
- FSDataOutputStream fileOut = fs.create(filename, progress);
|
|
|
+ Path file =
|
|
|
+ FileOutputFormat.getTaskOutputPath(job,
|
|
|
+ name + codec.getDefaultExtension());
|
|
|
+ FileSystem fs = file.getFileSystem(job);
|
|
|
+ FSDataOutputStream fileOut = fs.create(file, progress);
|
|
|
return new LineRecordWriter<K, V>(new DataOutputStream
|
|
|
(codec.createOutputStream(fileOut)),
|
|
|
keyValueSeparator);
|