1
0

ReduceTask.java 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. /**
  2. * Copyright 2005 The Apache Software Foundation
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.apache.hadoop.mapred;
  17. import org.apache.hadoop.io.*;
  18. import org.apache.hadoop.conf.*;
  19. import org.apache.hadoop.fs.*;
  20. import org.apache.hadoop.util.*;
  21. import java.io.*;
  22. import java.util.*;
  23. import java.text.*;
  24. /** A Reduce task. */
  25. class ReduceTask extends Task {
  26. static { // register a ctor
  27. WritableFactories.setFactory
  28. (ReduceTask.class,
  29. new WritableFactory() {
  30. public Writable newInstance() { return new ReduceTask(); }
  31. });
  32. }
  33. private String[][] mapTaskIds;
  34. private int partition;
  35. private boolean sortComplete;
  36. { getProgress().setStatus("reduce"); }
  37. private Progress copyPhase = getProgress().addPhase("copy");
  38. private Progress appendPhase = getProgress().addPhase("append");
  39. private Progress sortPhase = getProgress().addPhase("sort");
  40. private Progress reducePhase = getProgress().addPhase("reduce");
  41. private Configuration conf;
  42. private MapOutputFile mapOutputFile;
  43. public ReduceTask() {}
  44. public ReduceTask(String jobFile, String taskId,
  45. String[][] mapTaskIds, int partition) {
  46. super(jobFile, taskId);
  47. this.mapTaskIds = mapTaskIds;
  48. this.partition = partition;
  49. }
  50. public TaskRunner createRunner(TaskTracker tracker) {
  51. return new ReduceTaskRunner(this, tracker, this.conf);
  52. }
  53. public boolean isMapTask() {
  54. return false;
  55. }
  56. public String[][] getMapTaskIds() { return mapTaskIds; }
  57. public int getPartition() { return partition; }
  58. public void write(DataOutput out) throws IOException {
  59. super.write(out);
  60. out.writeInt(mapTaskIds.length); // write mapTaskIds
  61. for (int i = 0; i < mapTaskIds.length; i++) {
  62. out.writeInt(mapTaskIds[i].length);
  63. for (int j = 0; j < mapTaskIds[i].length; j++) {
  64. UTF8.writeString(out, mapTaskIds[i][j]);
  65. }
  66. }
  67. out.writeInt(partition); // write partition
  68. }
  69. public void readFields(DataInput in) throws IOException {
  70. super.readFields(in);
  71. mapTaskIds = new String[in.readInt()][]; // read mapTaskIds
  72. for (int i = 0; i < mapTaskIds.length; i++) {
  73. mapTaskIds[i] = new String[in.readInt()];
  74. for (int j = 0; j < mapTaskIds[i].length; j++) {
  75. mapTaskIds[i][j] = UTF8.readString(in);
  76. }
  77. }
  78. this.partition = in.readInt(); // read partition
  79. }
  80. /** Iterates values while keys match in sorted input. */
  81. private class ValuesIterator implements Iterator {
  82. private SequenceFile.Reader in; // input file
  83. private WritableComparable key; // current key
  84. private Writable value; // current value
  85. private boolean hasNext; // more w/ this key
  86. private boolean more; // more in file
  87. private float progPerByte;
  88. private TaskUmbilicalProtocol umbilical;
  89. private WritableComparator comparator;
  90. public ValuesIterator (SequenceFile.Reader in, long length,
  91. WritableComparator comparator,
  92. TaskUmbilicalProtocol umbilical)
  93. throws IOException {
  94. this.in = in;
  95. this.progPerByte = 1.0f / (float)length;
  96. this.umbilical = umbilical;
  97. this.comparator = comparator;
  98. getNext();
  99. }
  100. /// Iterator methods
  101. public boolean hasNext() { return hasNext; }
  102. public Object next() {
  103. try {
  104. Object result = value; // save value
  105. getNext(); // move to next
  106. return result; // return saved value
  107. } catch (IOException e) {
  108. throw new RuntimeException(e);
  109. }
  110. }
  111. public void remove() { throw new RuntimeException("not implemented"); }
  112. /// Auxiliary methods
  113. /** Start processing next unique key. */
  114. public void nextKey() {
  115. while (hasNext) { next(); } // skip any unread
  116. hasNext = more;
  117. }
  118. /** True iff more keys remain. */
  119. public boolean more() { return more; }
  120. /** The current key. */
  121. public WritableComparable getKey() { return key; }
  122. private void getNext() throws IOException {
  123. reducePhase.set(in.getPosition()*progPerByte); // update progress
  124. reportProgress(umbilical);
  125. Writable lastKey = key; // save previous key
  126. try {
  127. key = (WritableComparable)in.getKeyClass().newInstance();
  128. value = (Writable)in.getValueClass().newInstance();
  129. } catch (Exception e) {
  130. throw new RuntimeException(e);
  131. }
  132. more = in.next(key, value);
  133. if (more) {
  134. if (lastKey == null) {
  135. hasNext = true;
  136. } else {
  137. hasNext = (comparator.compare(key, lastKey) == 0);
  138. }
  139. } else {
  140. hasNext = false;
  141. }
  142. }
  143. }
  144. public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  145. throws IOException {
  146. Class keyClass = job.getOutputKeyClass();
  147. Class valueClass = job.getOutputValueClass();
  148. Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
  149. reducer.configure(job);
  150. FileSystem lfs = FileSystem.getNamed("local", job);
  151. copyPhase.complete(); // copy is already complete
  152. // open a file to collect map output
  153. String file = job.getLocalFile(getTaskId(), "all.1").toString();
  154. SequenceFile.Writer writer =
  155. new SequenceFile.Writer(lfs, file, keyClass, valueClass);
  156. try {
  157. // append all input files into a single input file
  158. for (int i = 0; i < mapTaskIds.length; i++) {
  159. appendPhase.addPhase(); // one per file
  160. }
  161. DataOutputBuffer buffer = new DataOutputBuffer();
  162. for (int i = 0; i < mapTaskIds.length; i++) {
  163. File partFile =
  164. this.mapOutputFile.getInputFile(mapTaskIds[i], getTaskId());
  165. float progPerByte = 1.0f / lfs.getLength(partFile);
  166. Progress phase = appendPhase.phase();
  167. phase.setStatus(partFile.toString());
  168. SequenceFile.Reader in =
  169. new SequenceFile.Reader(lfs, partFile.toString(), job);
  170. try {
  171. int keyLen;
  172. while((keyLen = in.next(buffer)) > 0) {
  173. writer.append(buffer.getData(), 0, buffer.getLength(), keyLen);
  174. phase.set(in.getPosition()*progPerByte);
  175. reportProgress(umbilical);
  176. buffer.reset();
  177. }
  178. } finally {
  179. in.close();
  180. }
  181. phase.complete();
  182. }
  183. } finally {
  184. writer.close();
  185. }
  186. appendPhase.complete(); // append is complete
  187. // spawn a thread to give sort progress heartbeats
  188. Thread sortProgress = new Thread() {
  189. public void run() {
  190. while (!sortComplete) {
  191. try {
  192. reportProgress(umbilical);
  193. Thread.sleep(PROGRESS_INTERVAL);
  194. } catch (InterruptedException e) {
  195. continue;
  196. } catch (Throwable e) {
  197. return;
  198. }
  199. }
  200. }
  201. };
  202. sortProgress.setName("Sort progress reporter for task "+getTaskId());
  203. String sortedFile = job.getLocalFile(getTaskId(), "all.2").toString();
  204. WritableComparator comparator = job.getOutputKeyComparator();
  205. try {
  206. sortProgress.start();
  207. // sort the input file
  208. SequenceFile.Sorter sorter =
  209. new SequenceFile.Sorter(lfs, comparator, valueClass, job);
  210. sorter.sort(file, sortedFile); // sort
  211. lfs.delete(new File(file)); // remove unsorted
  212. } finally {
  213. sortComplete = true;
  214. }
  215. sortPhase.complete(); // sort is complete
  216. // make output collector
  217. String name = getOutputName(getPartition());
  218. final RecordWriter out =
  219. job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name);
  220. OutputCollector collector = new OutputCollector() {
  221. public void collect(WritableComparable key, Writable value)
  222. throws IOException {
  223. out.write(key, value);
  224. reportProgress(umbilical);
  225. }
  226. };
  227. // apply reduce function
  228. SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
  229. Reporter reporter = getReporter(umbilical, getProgress());
  230. long length = lfs.getLength(new File(sortedFile));
  231. try {
  232. ValuesIterator values = new ValuesIterator(in, length, comparator,
  233. umbilical);
  234. while (values.more()) {
  235. reducer.reduce(values.getKey(), values, collector, reporter);
  236. values.nextKey();
  237. }
  238. } finally {
  239. in.close();
  240. lfs.delete(new File(sortedFile)); // remove sorted
  241. out.close(reporter);
  242. }
  243. done(umbilical);
  244. }
  245. /** Construct output file names so that, when an output directory listing is
  246. * sorted lexicographically, positions correspond to output partitions.*/
  247. private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  248. static {
  249. NUMBER_FORMAT.setMinimumIntegerDigits(5);
  250. NUMBER_FORMAT.setGroupingUsed(false);
  251. }
  252. private static synchronized String getOutputName(int partition) {
  253. return "part-" + NUMBER_FORMAT.format(partition);
  254. }
  255. public void setConf(Configuration conf) {
  256. this.conf = conf;
  257. this.mapOutputFile = new MapOutputFile();
  258. this.mapOutputFile.setConf(conf);
  259. }
  260. public Configuration getConf() {
  261. return this.conf;
  262. }
  263. }