|
@@ -132,6 +132,10 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
|
|
|
* Cache for the taskContexts
|
|
|
*/
|
|
|
private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
|
|
|
+ /**
|
|
|
+ * Cached TaskAttemptContext which uses the job's configured settings
|
|
|
+ */
|
|
|
+ private TaskAttemptContext jobOutputFormatContext;
|
|
|
|
|
|
/**
|
|
|
* Checks if a named output name is valid token.
|
|
@@ -382,11 +386,13 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
|
|
|
public void write(KEYOUT key, VALUEOUT value, String baseOutputPath)
|
|
|
throws IOException, InterruptedException {
|
|
|
checkBaseOutputPath(baseOutputPath);
|
|
|
- TaskAttemptContext taskContext =
|
|
|
- new TaskAttemptContextImpl(context.getConfiguration(),
|
|
|
- context.getTaskAttemptID(),
|
|
|
- new WrappedStatusReporter(context));
|
|
|
- getRecordWriter(taskContext, baseOutputPath).write(key, value);
|
|
|
+ if (jobOutputFormatContext == null) {
|
|
|
+ jobOutputFormatContext =
|
|
|
+ new TaskAttemptContextImpl(context.getConfiguration(),
|
|
|
+ context.getTaskAttemptID(),
|
|
|
+ new WrappedStatusReporter(context));
|
|
|
+ }
|
|
|
+ getRecordWriter(jobOutputFormatContext, baseOutputPath).write(key, value);
|
|
|
}
|
|
|
|
|
|
// by being synchronized MultipleOutputTask can be use with a
|