ReduceTask.java 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.File;
  22. import java.io.IOException;
  23. import java.net.URI;
  24. import java.net.URL;
  25. import java.net.URLClassLoader;
  26. import java.text.DecimalFormat;
  27. import java.text.NumberFormat;
  28. import java.util.ArrayList;
  29. import java.util.Collections;
  30. import java.util.HashSet;
  31. import java.util.Hashtable;
  32. import java.util.Iterator;
  33. import java.util.List;
  34. import java.util.Map;
  35. import java.util.Random;
  36. import java.util.Set;
  37. import java.util.TreeSet;
  38. import java.util.concurrent.atomic.AtomicBoolean;
  39. import org.apache.commons.logging.Log;
  40. import org.apache.commons.logging.LogFactory;
  41. import org.apache.hadoop.conf.Configuration;
  42. import org.apache.hadoop.fs.FileSystem;
  43. import org.apache.hadoop.fs.InMemoryFileSystem;
  44. import org.apache.hadoop.fs.LocalFileSystem;
  45. import org.apache.hadoop.fs.Path;
  46. import org.apache.hadoop.fs.PathFilter;
  47. import org.apache.hadoop.io.DataInputBuffer;
  48. import org.apache.hadoop.io.DataOutputBuffer;
  49. import org.apache.hadoop.io.IntWritable;
  50. import org.apache.hadoop.io.SequenceFile;
  51. import org.apache.hadoop.io.Writable;
  52. import org.apache.hadoop.io.WritableComparable;
  53. import org.apache.hadoop.io.WritableComparator;
  54. import org.apache.hadoop.io.WritableFactories;
  55. import org.apache.hadoop.io.WritableFactory;
  56. import org.apache.hadoop.metrics.MetricsContext;
  57. import org.apache.hadoop.metrics.MetricsRecord;
  58. import org.apache.hadoop.metrics.MetricsUtil;
  59. import org.apache.hadoop.util.Progress;
  60. import org.apache.hadoop.util.ReflectionUtils;
  61. import org.apache.hadoop.util.StringUtils;
  62. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  63. import static org.apache.hadoop.mapred.Task.Counter.*;
  64. /** A Reduce task. */
  65. class ReduceTask extends Task {
  66. static { // register a ctor
  67. WritableFactories.setFactory
  68. (ReduceTask.class,
  69. new WritableFactory() {
  70. public Writable newInstance() { return new ReduceTask(); }
  71. });
  72. }
  73. private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
  74. private int numMaps;
  75. AtomicBoolean sortComplete = new AtomicBoolean(false);
  76. private ReduceCopier reduceCopier;
  77. {
  78. getProgress().setStatus("reduce");
  79. setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
  80. }
  81. private Progress copyPhase = getProgress().addPhase("copy");
  82. private Progress sortPhase = getProgress().addPhase("sort");
  83. private Progress reducePhase = getProgress().addPhase("reduce");
  84. public ReduceTask() {}
  85. public ReduceTask(String jobId, String jobFile, String tipId, String taskId,
  86. int partition, int numMaps) {
  87. super(jobId, jobFile, tipId, taskId, partition);
  88. this.numMaps = numMaps;
  89. }
  90. public TaskRunner createRunner(TaskTracker tracker) throws IOException {
  91. return new ReduceTaskRunner(this, tracker, this.conf);
  92. }
  93. public boolean isMapTask() {
  94. return false;
  95. }
  96. public int getNumMaps() { return numMaps; }
  97. /**
  98. * Localize the given JobConf to be specific for this task.
  99. */
  100. public void localizeConfiguration(JobConf conf) throws IOException {
  101. super.localizeConfiguration(conf);
  102. conf.setNumMapTasks(numMaps);
  103. }
  104. public void write(DataOutput out) throws IOException {
  105. super.write(out);
  106. out.writeInt(numMaps); // write the number of maps
  107. }
  108. public void readFields(DataInput in) throws IOException {
  109. super.readFields(in);
  110. numMaps = in.readInt();
  111. }
  112. /** Iterates values while keys match in sorted input. */
  113. static class ValuesIterator implements Iterator {
  114. private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
  115. private WritableComparable key; // current key
  116. private Writable value; // current value
  117. private boolean hasNext; // more w/ this key
  118. private boolean more; // more in file
  119. private WritableComparator comparator;
  120. private Class keyClass;
  121. private Class valClass;
  122. private Configuration conf;
  123. private DataOutputBuffer valOut = new DataOutputBuffer();
  124. private DataInputBuffer valIn = new DataInputBuffer();
  125. private DataInputBuffer keyIn = new DataInputBuffer();
  126. protected Reporter reporter;
  127. public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
  128. WritableComparator comparator, Class keyClass,
  129. Class valClass, Configuration conf,
  130. Reporter reporter)
  131. throws IOException {
  132. this.in = in;
  133. this.conf = conf;
  134. this.comparator = comparator;
  135. this.keyClass = keyClass;
  136. this.valClass = valClass;
  137. this.reporter = reporter;
  138. getNext();
  139. }
  140. /// Iterator methods
  141. public boolean hasNext() { return hasNext; }
  142. public Object next() {
  143. Object result = value; // save value
  144. try {
  145. getNext(); // move to next
  146. } catch (IOException e) {
  147. throw new RuntimeException(e);
  148. }
  149. // ignore the error, since failures in progress shouldn't kill us
  150. try {
  151. reporter.progress();
  152. } catch (IOException ie) {
  153. LOG.debug("caught exception from progress", ie);
  154. }
  155. return result; // return saved value
  156. }
  157. public void remove() { throw new RuntimeException("not implemented"); }
  158. /// Auxiliary methods
  159. /** Start processing next unique key. */
  160. public void nextKey() {
  161. while (hasNext) { next(); } // skip any unread
  162. hasNext = more;
  163. }
  164. /** True iff more keys remain. */
  165. public boolean more() { return more; }
  166. /** The current key. */
  167. public WritableComparable getKey() { return key; }
  168. private void getNext() throws IOException {
  169. Writable lastKey = key; // save previous key
  170. try {
  171. key = (WritableComparable)ReflectionUtils.newInstance(keyClass, this.conf);
  172. value = (Writable)ReflectionUtils.newInstance(valClass, this.conf);
  173. } catch (Exception e) {
  174. throw new RuntimeException(e);
  175. }
  176. more = in.next();
  177. if (more) {
  178. //de-serialize the raw key/value
  179. keyIn.reset(in.getKey().getData(), in.getKey().getLength());
  180. key.readFields(keyIn);
  181. valOut.reset();
  182. (in.getValue()).writeUncompressedBytes(valOut);
  183. valIn.reset(valOut.getData(), valOut.getLength());
  184. value.readFields(valIn);
  185. if (lastKey == null) {
  186. hasNext = true;
  187. } else {
  188. hasNext = (comparator.compare(key, lastKey) == 0);
  189. }
  190. } else {
  191. hasNext = false;
  192. }
  193. }
  194. }
  195. private class ReduceValuesIterator extends ValuesIterator {
  196. public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
  197. WritableComparator comparator, Class keyClass,
  198. Class valClass,
  199. Configuration conf, Reporter reporter)
  200. throws IOException {
  201. super(in, comparator, keyClass, valClass, conf, reporter);
  202. }
  203. public void informReduceProgress() {
  204. reducePhase.set(super.in.getProgress().get()); // update progress
  205. try {
  206. reporter.progress();
  207. } catch (IOException ie) {
  208. LOG.debug("Exception caught from progress", ie);
  209. }
  210. }
  211. public Object next() {
  212. reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
  213. return super.next();
  214. }
  215. }
  216. public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  217. throws IOException {
  218. Class valueClass = job.getMapOutputValueClass();
  219. Reducer reducer = (Reducer)ReflectionUtils.newInstance(
  220. job.getReducerClass(), job);
  221. FileSystem lfs = FileSystem.getLocal(job);
  222. if (!job.get("mapred.job.tracker", "local").equals("local")) {
  223. reduceCopier = new ReduceCopier(umbilical, job);
  224. if (!reduceCopier.fetchOutputs()) {
  225. throw new IOException(getTaskId() + "The reduce copier failed");
  226. }
  227. }
  228. copyPhase.complete(); // copy is already complete
  229. // open a file to collect map output
  230. // since we don't know how many map outputs got merged in memory, we have
  231. // to check whether a given map output exists, and if it does, add it in
  232. // the list of files to merge, otherwise not.
  233. List<Path> mapFilesList = new ArrayList<Path>();
  234. for(int i=0; i < numMaps; i++) {
  235. Path f;
  236. try {
  237. //catch and ignore DiskErrorException, since some map outputs will
  238. //really be absent (inmem merge).
  239. f = mapOutputFile.getInputFile(i, getTaskId());
  240. } catch (DiskErrorException d) {
  241. continue;
  242. }
  243. if (lfs.exists(f))
  244. mapFilesList.add(f);
  245. }
  246. Path[] mapFiles = new Path[mapFilesList.size()];
  247. mapFiles = mapFilesList.toArray(mapFiles);
  248. // spawn a thread to give sort progress heartbeats
  249. Thread sortProgress = new Thread() {
  250. public void run() {
  251. while (!sortComplete.get()) {
  252. try {
  253. reportProgress(umbilical);
  254. Thread.sleep(PROGRESS_INTERVAL);
  255. } catch (InterruptedException e) {
  256. return;
  257. } catch (Throwable e) {
  258. System.out.println("Thread Exception in " +
  259. "reporting sort progress\n" +
  260. StringUtils.stringifyException(e));
  261. continue;
  262. }
  263. }
  264. }
  265. };
  266. sortProgress.setDaemon(true);
  267. sortProgress.setName("Sort progress reporter for task "+getTaskId());
  268. Path tempDir = new Path(getTaskId());
  269. WritableComparator comparator = job.getOutputValueGroupingComparator();
  270. SequenceFile.Sorter.RawKeyValueIterator rIter;
  271. try {
  272. setPhase(TaskStatus.Phase.SORT);
  273. sortProgress.start();
  274. // sort the input file
  275. SequenceFile.Sorter sorter =
  276. new SequenceFile.Sorter(lfs, comparator, valueClass, job);
  277. rIter = sorter.merge(mapFiles, tempDir,
  278. !conf.getKeepFailedTaskFiles()); // sort
  279. } finally {
  280. sortComplete.set(true);
  281. }
  282. sortPhase.complete(); // sort is complete
  283. setPhase(TaskStatus.Phase.REDUCE);
  284. final Reporter reporter = getReporter(umbilical);
  285. // make output collector
  286. String finalName = getOutputName(getPartition());
  287. FileSystem fs = FileSystem.get(job);
  288. final RecordWriter out =
  289. job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
  290. OutputCollector collector = new OutputCollector() {
  291. public void collect(WritableComparable key, Writable value)
  292. throws IOException {
  293. out.write(key, value);
  294. reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
  295. reportProgress(umbilical);
  296. }
  297. };
  298. // apply reduce function
  299. try {
  300. Class keyClass = job.getMapOutputKeyClass();
  301. Class valClass = job.getMapOutputValueClass();
  302. ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator,
  303. keyClass, valClass, job, reporter);
  304. values.informReduceProgress();
  305. while (values.more()) {
  306. reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);
  307. reducer.reduce(values.getKey(), values, collector, reporter);
  308. values.nextKey();
  309. values.informReduceProgress();
  310. }
  311. //Clean up: repeated in catch block below
  312. reducer.close();
  313. out.close(reporter);
  314. //End of clean up.
  315. } catch (IOException ioe) {
  316. try {
  317. reducer.close();
  318. } catch (IOException ignored) {}
  319. try {
  320. out.close(reporter);
  321. } catch (IOException ignored) {}
  322. throw ioe;
  323. }
  324. done(umbilical);
  325. }
  326. /** Construct output file names so that, when an output directory listing is
  327. * sorted lexicographically, positions correspond to output partitions.*/
  328. private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  329. static {
  330. NUMBER_FORMAT.setMinimumIntegerDigits(5);
  331. NUMBER_FORMAT.setGroupingUsed(false);
  332. }
  333. static synchronized String getOutputName(int partition) {
  334. return "part-" + NUMBER_FORMAT.format(partition);
  335. }
  336. private class ReduceCopier implements MRConstants {
  337. /** Reference to the umbilical object */
  338. private TaskUmbilicalProtocol umbilical;
  339. /** Reference to the task object */
  340. /** Number of ms before timing out a copy */
  341. private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
  342. /**
  343. * our reduce task instance
  344. */
  345. private ReduceTask reduceTask;
  346. /**
  347. * the list of map outputs currently being copied
  348. */
  349. private List<MapOutputLocation> scheduledCopies;
  350. /**
  351. * the results of dispatched copy attempts
  352. */
  353. private List<CopyResult> copyResults;
  354. /**
  355. * the number of outputs to copy in parallel
  356. */
  357. private int numCopiers;
  358. /**
  359. * the maximum amount of time (less 1 minute) to wait to
  360. * contact a host after a copy from it fails. We wait for (1 min +
  361. * Random.nextInt(maxBackoff)) seconds.
  362. */
  363. private int maxBackoff;
  364. /**
  365. * busy hosts from which copies are being backed off
  366. * Map of host -> next contact time
  367. */
  368. private Map<String, Long> penaltyBox;
  369. /**
  370. * the set of unique hosts from which we are copying
  371. */
  372. private Set<String> uniqueHosts;
  373. /**
  374. * the last time we polled the job tracker
  375. */
  376. private long lastPollTime;
  377. /**
  378. * A reference to the in memory file system for writing the map outputs to.
  379. */
  380. private InMemoryFileSystem inMemFileSys;
  381. /**
  382. * A reference to the local file system for writing the map outputs to.
  383. */
  384. private FileSystem localFileSys;
  385. /**
  386. * An instance of the sorter used for doing merge
  387. */
  388. private SequenceFile.Sorter sorter;
  389. /**
  390. * A reference to the throwable object (if merge throws an exception)
  391. */
  392. private volatile Throwable mergeThrowable;
  393. /**
  394. * A flag to indicate that merge is in progress
  395. */
  396. private volatile boolean mergeInProgress = false;
  397. /**
  398. * When we accumulate mergeThreshold number of files in ram, we merge/spill
  399. */
  400. private int mergeThreshold = 500;
  401. /**
  402. * The threads for fetching the files.
  403. */
  404. private MapOutputCopier[] copiers = null;
  405. /**
  406. * The threads for fetching the files.
  407. */
  408. private MetricsRecord shuffleMetrics = null;
  409. /**
  410. * the minimum interval between tasktracker polls
  411. */
  412. private static final long MIN_POLL_INTERVAL = 1000;
  413. /**
  414. * the number of map output locations to poll for at one time
  415. */
  416. private int probe_sample_size = 100;
  417. /**
  418. * a list of map output locations for fetch retrials
  419. */
  420. private List<MapOutputLocation> retryFetches =
  421. new ArrayList<MapOutputLocation>();
  422. /**
  423. * a TreeSet for needed map outputs
  424. */
  425. private Set <Integer> neededOutputs =
  426. Collections.synchronizedSet(new TreeSet<Integer>());
  427. private Random random = null;
  428. /**
  429. * the max size of the merge output from ramfs
  430. */
  431. private long ramfsMergeOutputSize;
  432. /** Represents the result of an attempt to copy a map output */
  433. private class CopyResult {
  434. // the map output location against which a copy attempt was made
  435. private final MapOutputLocation loc;
  436. // the size of the file copied, -1 if the transfer failed
  437. private final long size;
  438. //a flag signifying whether a copy result is obsolete
  439. private static final int OBSOLETE = -2;
  440. CopyResult(MapOutputLocation loc, long size) {
  441. this.loc = loc;
  442. this.size = size;
  443. }
  444. public int getMapId() { return loc.getMapId(); }
  445. public boolean getSuccess() { return size >= 0; }
  446. public boolean isObsolete() {
  447. return size == OBSOLETE;
  448. }
  449. public long getSize() { return size; }
  450. public String getHost() { return loc.getHost(); }
  451. public MapOutputLocation getLocation() { return loc; }
  452. }
  453. private int extractMapIdFromPathName(Path pathname) {
  454. //all paths end with map_<id>.out
  455. String firstPathName = pathname.getName();
  456. int beginIndex = firstPathName.lastIndexOf("map_");
  457. int endIndex = firstPathName.lastIndexOf(".out");
  458. return Integer.parseInt(firstPathName.substring(beginIndex +
  459. "map_".length(), endIndex));
  460. }
  461. private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
  462. //spawn a thread to give copy progress heartbeats
  463. Thread copyProgress = new Thread() {
  464. public void run() {
  465. LOG.debug("Started thread: " + getName());
  466. while (true) {
  467. try {
  468. reportProgress(umbilical);
  469. Thread.sleep(PROGRESS_INTERVAL);
  470. } catch (InterruptedException e) {
  471. return;
  472. } catch (Throwable e) {
  473. LOG.info("Thread Exception in " +
  474. "reporting copy progress\n" +
  475. StringUtils.stringifyException(e));
  476. continue;
  477. }
  478. }
  479. }
  480. };
  481. copyProgress.setName("Copy progress reporter for task "+getTaskId());
  482. copyProgress.setDaemon(true);
  483. return copyProgress;
  484. }
  485. private int nextMapOutputCopierId = 0;
  486. /** Copies map outputs as they become available */
  487. private class MapOutputCopier extends Thread {
  488. private MapOutputLocation currentLocation = null;
  489. private int id = nextMapOutputCopierId++;
  490. public MapOutputCopier() {
  491. setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
  492. LOG.debug(getName() + " created");
  493. }
  494. /**
  495. * Fail the current file that we are fetching
  496. * @return were we currently fetching?
  497. */
  498. public synchronized boolean fail() {
  499. if (currentLocation != null) {
  500. finish(-1);
  501. return true;
  502. } else {
  503. return false;
  504. }
  505. }
  506. /**
  507. * Get the current map output location.
  508. */
  509. public synchronized MapOutputLocation getLocation() {
  510. return currentLocation;
  511. }
  512. private synchronized void start(MapOutputLocation loc) {
  513. currentLocation = loc;
  514. }
  515. private synchronized void finish(long size) {
  516. if (currentLocation != null) {
  517. LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
  518. synchronized (copyResults) {
  519. copyResults.add(new CopyResult(currentLocation, size));
  520. copyResults.notify();
  521. }
  522. currentLocation = null;
  523. }
  524. }
  525. /** Loop forever and fetch map outputs as they become available.
  526. * The thread exits when it is interrupted by {@link ReduceTaskRunner}
  527. */
  528. public void run() {
  529. while (true) {
  530. try {
  531. MapOutputLocation loc = null;
  532. long size = -1;
  533. synchronized (scheduledCopies) {
  534. while (scheduledCopies.isEmpty()) {
  535. scheduledCopies.wait();
  536. }
  537. loc = scheduledCopies.remove(0);
  538. }
  539. try {
  540. start(loc);
  541. size = copyOutput(loc);
  542. } catch (IOException e) {
  543. LOG.warn(reduceTask.getTaskId() + " copy failed: " +
  544. loc.getMapTaskId() + " from " + loc.getHost());
  545. LOG.warn(StringUtils.stringifyException(e));
  546. } finally {
  547. finish(size);
  548. }
  549. } catch (InterruptedException e) {
  550. return; // ALL DONE
  551. } catch (Throwable th) {
  552. LOG.error("Map output copy failure: " +
  553. StringUtils.stringifyException(th));
  554. }
  555. }
  556. }
  557. /** Copies a a map output from a remote host, using raw RPC.
  558. * @param currentLocation the map output location to be copied
  559. * @return the path (fully qualified) of the copied file
  560. * @throws IOException if there is an error copying the file
  561. * @throws InterruptedException if the copier should give up
  562. */
  563. private long copyOutput(MapOutputLocation loc
  564. ) throws IOException, InterruptedException {
  565. if (!neededOutputs.contains(loc.getMapId())) {
  566. return CopyResult.OBSOLETE;
  567. }
  568. String reduceId = reduceTask.getTaskId();
  569. LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
  570. " output from " + loc.getHost() + ".");
  571. // a temp filename. If this file gets created in ramfs, we're fine,
  572. // else, we will check the localFS to find a suitable final location
  573. // for this path
  574. Path filename = new Path("/" + reduceId + "/map_" +
  575. loc.getMapId() + ".out");
  576. // a working filename that will be unique to this attempt
  577. Path tmpFilename = new Path(filename + "-" + id);
  578. // this copies the map output file
  579. tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
  580. tmpFilename, lDirAlloc,
  581. conf, reduceTask.getPartition(),
  582. STALLED_COPY_TIMEOUT);
  583. if (!neededOutputs.contains(loc.getMapId())) {
  584. if (tmpFilename != null) {
  585. FileSystem fs = tmpFilename.getFileSystem(conf);
  586. fs.delete(tmpFilename);
  587. }
  588. return CopyResult.OBSOLETE;
  589. }
  590. if (tmpFilename == null)
  591. throw new IOException("File " + filename + "-" + id +
  592. " not created");
  593. long bytes = -1;
  594. // lock the ReduceTask while we do the rename
  595. synchronized (ReduceTask.this) {
  596. // This file could have been created in the inmemory
  597. // fs or the localfs. So need to get the filesystem owning the path.
  598. FileSystem fs = tmpFilename.getFileSystem(conf);
  599. if (!neededOutputs.contains(loc.getMapId())) {
  600. fs.delete(tmpFilename);
  601. return CopyResult.OBSOLETE;
  602. }
  603. bytes = fs.getLength(tmpFilename);
  604. //resolve the final filename against the directory where the tmpFile
  605. //got created
  606. filename = new Path(tmpFilename.getParent(), filename.getName());
  607. // if we can't rename the file, something is broken (and IOException
  608. // will be thrown).
  609. if (!fs.rename(tmpFilename, filename)) {
  610. fs.delete(tmpFilename);
  611. bytes = -1;
  612. throw new IOException("failure to rename map output " +
  613. tmpFilename);
  614. }
  615. LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
  616. " output from " + loc.getHost() + ".");
  617. //Create a thread to do merges. Synchronize access/update to
  618. //mergeInProgress
  619. if (!mergeInProgress &&
  620. (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE ||
  621. (mergeThreshold > 0 &&
  622. inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >=
  623. mergeThreshold))&&
  624. mergeThrowable == null) {
  625. LOG.info(reduceId + " InMemoryFileSystem " +
  626. inMemFileSys.getUri().toString() +
  627. " is " + inMemFileSys.getPercentUsed() +
  628. " full. Triggering merge");
  629. InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
  630. (LocalFileSystem)localFileSys, sorter);
  631. m.setName("Thread for merging in memory files");
  632. m.setDaemon(true);
  633. mergeInProgress = true;
  634. m.start();
  635. }
  636. neededOutputs.remove(loc.getMapId());
  637. }
  638. return bytes;
  639. }
  640. }
  641. private void configureClasspath(JobConf conf)
  642. throws IOException {
  643. // get the task and the current classloader which will become the parent
  644. Task task = ReduceTask.this;
  645. ClassLoader parent = conf.getClassLoader();
  646. // get the work directory which holds the elements we are dynamically
  647. // adding to the classpath
  648. File workDir = new File(task.getJobFile()).getParentFile();
  649. File jobCacheDir = new File(workDir.getParent(), "work");
  650. ArrayList<URL> urllist = new ArrayList<URL>();
  651. // add the jars and directories to the classpath
  652. String jar = conf.getJar();
  653. if (jar != null) {
  654. File[] libs = new File(jobCacheDir, "lib").listFiles();
  655. if (libs != null) {
  656. for (int i = 0; i < libs.length; i++) {
  657. urllist.add(libs[i].toURL());
  658. }
  659. }
  660. urllist.add(new File(jobCacheDir, "classes").toURL());
  661. urllist.add(jobCacheDir.toURL());
  662. }
  663. urllist.add(workDir.toURL());
  664. // create a new classloader with the old classloader as its parent
  665. // then set that classloader as the one used by the current jobconf
  666. URL[] urls = urllist.toArray(new URL[urllist.size()]);
  667. URLClassLoader loader = new URLClassLoader(urls, parent);
  668. conf.setClassLoader(loader);
  669. }
  670. public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
  671. throws IOException {
  672. configureClasspath(conf);
  673. this.umbilical = umbilical;
  674. this.reduceTask = ReduceTask.this;
  675. this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
  676. this.copyResults = new ArrayList<CopyResult>(100);
  677. this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
  678. this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
  679. this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
  680. //we want to distinguish inmem fs instances for different reduces. Hence,
  681. //append a unique string in the uri for the inmem fs name
  682. URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
  683. inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
  684. LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
  685. + uri);
  686. ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE *
  687. inMemFileSys.getFSSize());
  688. localFileSys = FileSystem.getLocal(conf);
  689. //create an instance of the sorter
  690. sorter =
  691. new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(),
  692. conf.getMapOutputValueClass(), conf);
  693. // hosts -> next contact time
  694. this.penaltyBox = new Hashtable<String, Long>();
  695. // hostnames
  696. this.uniqueHosts = new HashSet<String>();
  697. this.lastPollTime = 0;
  698. MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  699. this.shuffleMetrics =
  700. MetricsUtil.createRecord(metricsContext, "shuffleInput");
  701. this.shuffleMetrics.setTag("user", conf.getUser());
  702. // Seed the random number generator with a reasonably globally unique seed
  703. long randomSeed = System.nanoTime() +
  704. (long)Math.pow(this.reduceTask.getPartition(),
  705. (this.reduceTask.getPartition()%10)
  706. );
  707. this.random = new Random(randomSeed);
  708. }
  709. public boolean fetchOutputs() throws IOException {
  710. final int numOutputs = reduceTask.getNumMaps();
  711. List<MapOutputLocation> knownOutputs =
  712. new ArrayList<MapOutputLocation>(numCopiers);
  713. int numInFlight = 0, numCopied = 0;
  714. int lowThreshold = numCopiers*2;
  715. long bytesTransferred = 0;
  716. DecimalFormat mbpsFormat = new DecimalFormat("0.00");
  717. Random backoff = new Random();
  718. final Progress copyPhase =
  719. reduceTask.getProgress().phase();
  720. //tweak the probe sample size (make it a function of numCopiers)
  721. probe_sample_size = Math.max(numCopiers*5, 50);
  722. for (int i = 0; i < numOutputs; i++) {
  723. neededOutputs.add(i);
  724. copyPhase.addPhase(); // add sub-phase per file
  725. }
  726. copiers = new MapOutputCopier[numCopiers];
  727. // start all the copying threads
  728. for (int i=0; i < copiers.length; i++) {
  729. copiers[i] = new MapOutputCopier();
  730. copiers[i].start();
  731. }
  732. // start the clock for bandwidth measurement
  733. long startTime = System.currentTimeMillis();
  734. long currentTime = startTime;
  735. IntWritable fromEventId = new IntWritable(0);
  736. Thread copyProgress = createProgressThread(umbilical);
  737. copyProgress.start();
  738. try {
  739. // loop until we get all required outputs
  740. while (!neededOutputs.isEmpty() && mergeThrowable == null) {
  741. LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
  742. " map output(s)");
  743. try {
  744. // Put the hash entries for the failed fetches. Entries here
  745. // might be replaced by (mapId) hashkeys from new successful
  746. // Map executions, if the fetch failures were due to lost tasks.
  747. // The replacements, if at all, will happen when we query the
  748. // tasktracker and put the mapId hashkeys with new
  749. // MapOutputLocations as values
  750. knownOutputs.addAll(retryFetches);
  751. // The call getsMapCompletionEvents will modify fromEventId to a val
  752. // that it should be for the next call to getSuccessMapEvents
  753. List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId);
  754. // put discovered them on the known list
  755. for (int i=0; i < locs.size(); i++) {
  756. knownOutputs.add(locs.get(i));
  757. }
  758. LOG.info(reduceTask.getTaskId() +
  759. " Got " + locs.size() +
  760. " new map outputs from tasktracker and " + retryFetches.size()
  761. + " map outputs from previous failures");
  762. // clear the "failed" fetches hashmap
  763. retryFetches.clear();
  764. }
  765. catch (IOException ie) {
  766. LOG.warn(reduceTask.getTaskId() +
  767. " Problem locating map outputs: " +
  768. StringUtils.stringifyException(ie));
  769. }
  770. // now walk through the cache and schedule what we can
  771. int numKnown = knownOutputs.size(), numScheduled = 0;
  772. int numSlow = 0, numDups = 0;
  773. LOG.info(reduceTask.getTaskId() + " Got " + numKnown +
  774. " known map output location(s); scheduling...");
  775. synchronized (scheduledCopies) {
  776. // Randomize the map output locations to prevent
  777. // all reduce-tasks swamping the same tasktracker
  778. Collections.shuffle(knownOutputs, this.random);
  779. Iterator locIt = knownOutputs.iterator();
  780. currentTime = System.currentTimeMillis();
  781. while (locIt.hasNext()) {
  782. MapOutputLocation loc = (MapOutputLocation)locIt.next();
  783. Long penaltyEnd = penaltyBox.get(loc.getHost());
  784. boolean penalized = false, duplicate = false;
  785. if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
  786. penalized = true; numSlow++;
  787. }
  788. if (uniqueHosts.contains(loc.getHost())) {
  789. duplicate = true; numDups++;
  790. }
  791. if (!penalized && !duplicate) {
  792. uniqueHosts.add(loc.getHost());
  793. scheduledCopies.add(loc);
  794. locIt.remove(); // remove from knownOutputs
  795. numInFlight++; numScheduled++;
  796. }
  797. }
  798. scheduledCopies.notifyAll();
  799. }
  800. LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
  801. " of " + numKnown + " known outputs (" + numSlow +
  802. " slow hosts and " + numDups + " dup hosts)");
  803. // if we have no copies in flight and we can't schedule anything
  804. // new, just wait for a bit
  805. try {
  806. if (numInFlight == 0 && numScheduled == 0) {
  807. Thread.sleep(5000);
  808. }
  809. } catch (InterruptedException e) { } // IGNORE
  810. while (numInFlight > 0 && mergeThrowable == null) {
  811. LOG.debug(reduceTask.getTaskId() + " numInFlight = " +
  812. numInFlight);
  813. CopyResult cr = getCopyResult();
  814. if (cr != null) {
  815. if (cr.getSuccess()) { // a successful copy
  816. numCopied++;
  817. bytesTransferred += cr.getSize();
  818. long secsSinceStart =
  819. (System.currentTimeMillis()-startTime)/1000+1;
  820. float mbs = ((float)bytesTransferred)/(1024*1024);
  821. float transferRate = mbs/secsSinceStart;
  822. copyPhase.startNextPhase();
  823. copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
  824. + " at " +
  825. mbpsFormat.format(transferRate) + " MB/s)");
  826. } else if (cr.isObsolete()) {
  827. //ignore
  828. LOG.info(reduceTask.getTaskId() +
  829. " Ignoring obsolete copy result for Map Task: " +
  830. cr.getLocation().getMapTaskId() + " from host: " +
  831. cr.getHost());
  832. } else {
  833. retryFetches.add(cr.getLocation());
  834. // wait a random amount of time for next contact
  835. currentTime = System.currentTimeMillis();
  836. long nextContact = currentTime + 60 * 1000 +
  837. backoff.nextInt(maxBackoff*1000);
  838. penaltyBox.put(cr.getHost(), nextContact);
  839. LOG.warn(reduceTask.getTaskId() + " adding host " +
  840. cr.getHost() + " to penalty box, next contact in " +
  841. ((nextContact-currentTime)/1000) + " seconds");
  842. // other outputs from the failed host may be present in the
  843. // knownOutputs cache, purge them. This is important in case
  844. // the failure is due to a lost tasktracker (causes many
  845. // unnecessary backoffs). If not, we only take a small hit
  846. // polling the tasktracker a few more times
  847. Iterator locIt = knownOutputs.iterator();
  848. while (locIt.hasNext()) {
  849. MapOutputLocation loc = (MapOutputLocation)locIt.next();
  850. if (cr.getHost().equals(loc.getHost())) {
  851. retryFetches.add(loc);
  852. locIt.remove();
  853. }
  854. }
  855. }
  856. uniqueHosts.remove(cr.getHost());
  857. numInFlight--;
  858. }
  859. boolean busy = true;
  860. // ensure we have enough to keep us busy
  861. if (numInFlight < lowThreshold && (numOutputs-numCopied) >
  862. probe_sample_size) {
  863. busy = false;
  864. }
  865. //Check whether we have more CopyResult to check. If there is none,
  866. //and we are not busy enough, break
  867. synchronized (copyResults) {
  868. if (copyResults.size() == 0 && !busy) {
  869. break;
  870. }
  871. }
  872. }
  873. }
  874. // all done, inform the copiers to exit
  875. synchronized (copiers) {
  876. synchronized (scheduledCopies) {
  877. for (int i=0; i < copiers.length; i++) {
  878. copiers[i].interrupt();
  879. copiers[i] = null;
  880. }
  881. }
  882. }
  883. //Do a merge of in-memory files (if there are any)
  884. if (mergeThrowable == null) {
  885. try {
  886. //wait for an ongoing merge (if it is in flight) to complete
  887. while (mergeInProgress) {
  888. Thread.sleep(200);
  889. }
  890. LOG.info(reduceTask.getTaskId() +
  891. " Copying of all map outputs complete. " +
  892. "Initiating the last merge on the remaining files in " +
  893. inMemFileSys.getUri());
  894. if (mergeThrowable != null) {
  895. //this could happen if the merge that
  896. //was in progress threw an exception
  897. throw mergeThrowable;
  898. }
  899. //initiate merge
  900. Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
  901. if (inMemClosedFiles.length == 0) {
  902. LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
  903. inMemFileSys.getUri());
  904. return neededOutputs.isEmpty();
  905. }
  906. //name this output file same as the name of the first file that is
  907. //there in the current list of inmem files (this is guaranteed to be
  908. //absent on the disk currently. So we don't overwrite a prev.
  909. //created spill). Also we need to create the output file now since
  910. //it is not guaranteed that this file will be present after merge
  911. //is called (we delete empty sequence files as soon as we see them
  912. //in the merge method)
  913. int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
  914. Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
  915. reduceTask.getTaskId(), ramfsMergeOutputSize);
  916. SequenceFile.Writer writer = sorter.cloneFileAttributes(
  917. inMemFileSys.makeQualified(inMemClosedFiles[0]),
  918. localFileSys.makeQualified(outputPath), null);
  919. SequenceFile.Sorter.RawKeyValueIterator rIter = null;
  920. try {
  921. rIter = sorter.merge(inMemClosedFiles, true,
  922. inMemClosedFiles.length,
  923. new Path(reduceTask.getTaskId()));
  924. } catch (Exception e) {
  925. //make sure that we delete the ondisk file that we created earlier
  926. //when we invoked cloneFileAttributes
  927. writer.close();
  928. localFileSys.delete(inMemClosedFiles[0]);
  929. throw new IOException (StringUtils.stringifyException(e));
  930. }
  931. sorter.writeFile(rIter, writer);
  932. writer.close();
  933. LOG.info(reduceTask.getTaskId() +
  934. " Merge of the " +inMemClosedFiles.length +
  935. " files in InMemoryFileSystem complete." +
  936. " Local file is " + outputPath);
  937. } catch (Throwable t) {
  938. LOG.warn(reduceTask.getTaskId() +
  939. " Final merge of the inmemory files threw an exception: " +
  940. StringUtils.stringifyException(t));
  941. return false;
  942. }
  943. }
  944. return mergeThrowable == null && neededOutputs.isEmpty();
  945. } finally {
  946. inMemFileSys.close();
  947. copyProgress.interrupt();
  948. }
  949. }
  950. private CopyResult getCopyResult() {
  951. synchronized (copyResults) {
  952. while (copyResults.isEmpty()) {
  953. try {
  954. copyResults.wait();
  955. } catch (InterruptedException e) { }
  956. }
  957. if (copyResults.isEmpty()) {
  958. return null;
  959. } else {
  960. return copyResults.remove(0);
  961. }
  962. }
  963. }
  964. /** Queries the task tracker for a set of outputs ready to be copied
  965. * @param fromEventId the first event ID we want to start from, this is
  966. * modified by the call to this method
  967. * @param jobClient the job tracker
  968. * @return a set of locations to copy outputs from
  969. * @throws IOException
  970. */
  971. private List <MapOutputLocation> getMapCompletionEvents(IntWritable fromEventId)
  972. throws IOException {
  973. long currentTime = System.currentTimeMillis();
  974. long pollTime = lastPollTime + MIN_POLL_INTERVAL;
  975. while (currentTime < pollTime) {
  976. try {
  977. Thread.sleep(pollTime-currentTime);
  978. } catch (InterruptedException ie) { } // IGNORE
  979. currentTime = System.currentTimeMillis();
  980. }
  981. lastPollTime = currentTime;
  982. TaskCompletionEvent t[] = umbilical.getMapCompletionEvents(
  983. reduceTask.getJobId(),
  984. fromEventId.get(),
  985. probe_sample_size);
  986. List <MapOutputLocation> mapOutputsList =
  987. new ArrayList<MapOutputLocation>();
  988. for (TaskCompletionEvent event : t) {
  989. if (event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
  990. URI u = URI.create(event.getTaskTrackerHttp());
  991. String host = u.getHost();
  992. int port = u.getPort();
  993. String taskId = event.getTaskId();
  994. int mId = event.idWithinJob();
  995. mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
  996. } else if (event.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED) {
  997. neededOutputs.remove(event.idWithinJob());
  998. LOG.info("Ignoring output of failed map: '" + event.getTaskId() + "'");
  999. }
  1000. }
  1001. fromEventId.set(fromEventId.get() + t.length);
  1002. return mapOutputsList;
  1003. }
  1004. private class InMemFSMergeThread extends Thread {
  1005. private InMemoryFileSystem inMemFileSys;
  1006. private LocalFileSystem localFileSys;
  1007. private SequenceFile.Sorter sorter;
  1008. public InMemFSMergeThread(InMemoryFileSystem inMemFileSys,
  1009. LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
  1010. this.inMemFileSys = inMemFileSys;
  1011. this.localFileSys = localFileSys;
  1012. this.sorter = sorter;
  1013. }
  1014. public void run() {
  1015. LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
  1016. try {
  1017. //initiate merge
  1018. Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
  1019. //Note that the above Path[] could be of length 0 if all copies are
  1020. //in flight. So we make sure that we have some 'closed' map
  1021. //output files to merge to get the benefit of in-memory merge
  1022. if (inMemClosedFiles.length >=
  1023. (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
  1024. //name this output file same as the name of the first file that is
  1025. //there in the current list of inmem files (this is guaranteed to
  1026. //be absent on the disk currently. So we don't overwrite a prev.
  1027. //created spill). Also we need to create the output file now since
  1028. //it is not guaranteed that this file will be present after merge
  1029. //is called (we delete empty sequence files as soon as we see them
  1030. //in the merge method)
  1031. //figure out the mapId
  1032. int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
  1033. Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
  1034. reduceTask.getTaskId(), ramfsMergeOutputSize);
  1035. SequenceFile.Writer writer = sorter.cloneFileAttributes(
  1036. inMemFileSys.makeQualified(inMemClosedFiles[0]),
  1037. localFileSys.makeQualified(outputPath), null);
  1038. SequenceFile.Sorter.RawKeyValueIterator rIter;
  1039. try {
  1040. rIter = sorter.merge(inMemClosedFiles, true,
  1041. inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
  1042. } catch (Exception e) {
  1043. //make sure that we delete the ondisk file that we created
  1044. //earlier when we invoked cloneFileAttributes
  1045. writer.close();
  1046. localFileSys.delete(outputPath);
  1047. throw new IOException (StringUtils.stringifyException(e));
  1048. }
  1049. sorter.writeFile(rIter, writer);
  1050. writer.close();
  1051. LOG.info(reduceTask.getTaskId() +
  1052. " Merge of the " +inMemClosedFiles.length +
  1053. " files in InMemoryFileSystem complete." +
  1054. " Local file is " + outputPath);
  1055. }
  1056. else {
  1057. LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
  1058. inMemFileSys.getUri());
  1059. }
  1060. } catch (Throwable t) {
  1061. LOG.warn(reduceTask.getTaskId() +
  1062. " Intermediate Merge of the inmemory files threw an exception: "
  1063. + StringUtils.stringifyException(t));
  1064. ReduceCopier.this.mergeThrowable = t;
  1065. }
  1066. finally {
  1067. mergeInProgress = false;
  1068. }
  1069. }
  1070. }
  1071. final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
  1072. public boolean accept(Path file) {
  1073. return file.toString().endsWith(".out");
  1074. }
  1075. };
  1076. }
  1077. }