1
0

ReduceTask.java 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.File;
  22. import java.io.IOException;
  23. import java.net.URI;
  24. import java.net.URL;
  25. import java.net.URLClassLoader;
  26. import java.text.DecimalFormat;
  27. import java.util.ArrayList;
  28. import java.util.Collections;
  29. import java.util.HashSet;
  30. import java.util.Hashtable;
  31. import java.util.Iterator;
  32. import java.util.List;
  33. import java.util.Map;
  34. import java.util.Random;
  35. import java.util.Set;
  36. import java.util.TreeSet;
  37. import java.util.concurrent.atomic.AtomicBoolean;
  38. import org.apache.commons.logging.Log;
  39. import org.apache.commons.logging.LogFactory;
  40. import org.apache.hadoop.conf.Configuration;
  41. import org.apache.hadoop.fs.FileSystem;
  42. import org.apache.hadoop.fs.InMemoryFileSystem;
  43. import org.apache.hadoop.fs.LocalFileSystem;
  44. import org.apache.hadoop.fs.Path;
  45. import org.apache.hadoop.fs.PathFilter;
  46. import org.apache.hadoop.io.DataInputBuffer;
  47. import org.apache.hadoop.io.DataOutputBuffer;
  48. import org.apache.hadoop.io.IntWritable;
  49. import org.apache.hadoop.io.SequenceFile;
  50. import org.apache.hadoop.io.Writable;
  51. import org.apache.hadoop.io.WritableComparable;
  52. import org.apache.hadoop.io.WritableComparator;
  53. import org.apache.hadoop.io.WritableFactories;
  54. import org.apache.hadoop.io.WritableFactory;
  55. import org.apache.hadoop.metrics.MetricsContext;
  56. import org.apache.hadoop.metrics.MetricsRecord;
  57. import org.apache.hadoop.metrics.MetricsUtil;
  58. import org.apache.hadoop.metrics.Updater;
  59. import org.apache.hadoop.util.Progress;
  60. import org.apache.hadoop.util.ReflectionUtils;
  61. import org.apache.hadoop.util.StringUtils;
  62. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  63. import static org.apache.hadoop.mapred.Task.Counter.*;
  64. /** A Reduce task. */
  65. class ReduceTask extends Task {
  66. static { // register a ctor
  67. WritableFactories.setFactory
  68. (ReduceTask.class,
  69. new WritableFactory() {
  70. public Writable newInstance() { return new ReduceTask(); }
  71. });
  72. }
  73. private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
  74. private int numMaps;
  75. private ReduceCopier reduceCopier;
  76. {
  77. getProgress().setStatus("reduce");
  78. setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
  79. }
  80. private Progress copyPhase = getProgress().addPhase("copy");
  81. private Progress sortPhase = getProgress().addPhase("sort");
  82. private Progress reducePhase = getProgress().addPhase("reduce");
  83. public ReduceTask() {}
  84. public ReduceTask(String jobId, String jobFile, String tipId, String taskId,
  85. int partition, int numMaps) {
  86. super(jobId, jobFile, tipId, taskId, partition);
  87. this.numMaps = numMaps;
  88. }
  89. public TaskRunner createRunner(TaskTracker tracker) throws IOException {
  90. return new ReduceTaskRunner(this, tracker, this.conf);
  91. }
  92. public boolean isMapTask() {
  93. return false;
  94. }
  95. public int getNumMaps() { return numMaps; }
  96. /**
  97. * Localize the given JobConf to be specific for this task.
  98. */
  99. public void localizeConfiguration(JobConf conf) throws IOException {
  100. super.localizeConfiguration(conf);
  101. conf.setNumMapTasks(numMaps);
  102. }
  103. public void write(DataOutput out) throws IOException {
  104. super.write(out);
  105. out.writeInt(numMaps); // write the number of maps
  106. }
  107. public void readFields(DataInput in) throws IOException {
  108. super.readFields(in);
  109. numMaps = in.readInt();
  110. }
  111. /** Iterates values while keys match in sorted input. */
  112. static class ValuesIterator implements Iterator {
  113. private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
  114. private WritableComparable key; // current key
  115. private Writable value; // current value
  116. private boolean hasNext; // more w/ this key
  117. private boolean more; // more in file
  118. private WritableComparator comparator;
  119. private Class keyClass;
  120. private Class valClass;
  121. private Configuration conf;
  122. private DataOutputBuffer valOut = new DataOutputBuffer();
  123. private DataInputBuffer valIn = new DataInputBuffer();
  124. private DataInputBuffer keyIn = new DataInputBuffer();
  125. protected Reporter reporter;
  126. public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
  127. WritableComparator comparator, Class keyClass,
  128. Class valClass, Configuration conf,
  129. Reporter reporter)
  130. throws IOException {
  131. this.in = in;
  132. this.conf = conf;
  133. this.comparator = comparator;
  134. this.keyClass = keyClass;
  135. this.valClass = valClass;
  136. this.reporter = reporter;
  137. getNext();
  138. }
  139. /// Iterator methods
  140. public boolean hasNext() { return hasNext; }
  141. public Object next() {
  142. Object result = value; // save value
  143. try {
  144. getNext(); // move to next
  145. } catch (IOException e) {
  146. throw new RuntimeException(e);
  147. }
  148. reporter.progress();
  149. return result; // return saved value
  150. }
  151. public void remove() { throw new RuntimeException("not implemented"); }
  152. /// Auxiliary methods
  153. /** Start processing next unique key. */
  154. public void nextKey() {
  155. while (hasNext) { next(); } // skip any unread
  156. hasNext = more;
  157. }
  158. /** True iff more keys remain. */
  159. public boolean more() { return more; }
  160. /** The current key. */
  161. public WritableComparable getKey() { return key; }
  162. private void getNext() throws IOException {
  163. Writable lastKey = key; // save previous key
  164. try {
  165. key = (WritableComparable)ReflectionUtils.newInstance(keyClass, this.conf);
  166. value = (Writable)ReflectionUtils.newInstance(valClass, this.conf);
  167. } catch (Exception e) {
  168. throw new RuntimeException(e);
  169. }
  170. more = in.next();
  171. if (more) {
  172. //de-serialize the raw key/value
  173. keyIn.reset(in.getKey().getData(), in.getKey().getLength());
  174. key.readFields(keyIn);
  175. valOut.reset();
  176. (in.getValue()).writeUncompressedBytes(valOut);
  177. valIn.reset(valOut.getData(), valOut.getLength());
  178. value.readFields(valIn);
  179. if (lastKey == null) {
  180. hasNext = true;
  181. } else {
  182. hasNext = (comparator.compare(key, lastKey) == 0);
  183. }
  184. } else {
  185. hasNext = false;
  186. }
  187. }
  188. }
  189. private class ReduceValuesIterator extends ValuesIterator {
  190. public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
  191. WritableComparator comparator, Class keyClass,
  192. Class valClass,
  193. Configuration conf, Reporter reporter)
  194. throws IOException {
  195. super(in, comparator, keyClass, valClass, conf, reporter);
  196. }
  197. public void informReduceProgress() {
  198. reducePhase.set(super.in.getProgress().get()); // update progress
  199. reporter.progress();
  200. }
  201. public Object next() {
  202. reporter.incrCounter(REDUCE_INPUT_RECORDS, 1);
  203. return super.next();
  204. }
  205. }
  206. public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  207. throws IOException {
  208. Reducer reducer = (Reducer)ReflectionUtils.newInstance(
  209. job.getReducerClass(), job);
  210. // start thread that will handle communication with parent
  211. startCommunicationThread(umbilical);
  212. FileSystem lfs = FileSystem.getLocal(job);
  213. if (!job.get("mapred.job.tracker", "local").equals("local")) {
  214. reduceCopier = new ReduceCopier(umbilical, job);
  215. if (!reduceCopier.fetchOutputs()) {
  216. throw new IOException(getTaskId() + "The reduce copier failed");
  217. }
  218. }
  219. copyPhase.complete(); // copy is already complete
  220. // open a file to collect map output
  221. // since we don't know how many map outputs got merged in memory, we have
  222. // to check whether a given map output exists, and if it does, add it in
  223. // the list of files to merge, otherwise not.
  224. List<Path> mapFilesList = new ArrayList<Path>();
  225. for(int i=0; i < numMaps; i++) {
  226. Path f;
  227. try {
  228. //catch and ignore DiskErrorException, since some map outputs will
  229. //really be absent (inmem merge).
  230. f = mapOutputFile.getInputFile(i, getTaskId());
  231. } catch (DiskErrorException d) {
  232. continue;
  233. }
  234. if (lfs.exists(f))
  235. mapFilesList.add(f);
  236. }
  237. Path[] mapFiles = new Path[mapFilesList.size()];
  238. mapFiles = mapFilesList.toArray(mapFiles);
  239. Path tempDir = new Path(getTaskId());
  240. SequenceFile.Sorter.RawKeyValueIterator rIter;
  241. setPhase(TaskStatus.Phase.SORT);
  242. final Reporter reporter = getReporter(umbilical);
  243. // sort the input file
  244. SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs,
  245. job.getOutputKeyComparator(), job.getMapOutputValueClass(), job);
  246. sorter.setProgressable(reporter);
  247. rIter = sorter.merge(mapFiles, tempDir,
  248. !conf.getKeepFailedTaskFiles()); // sort
  249. sortPhase.complete(); // sort is complete
  250. setPhase(TaskStatus.Phase.REDUCE);
  251. // make output collector
  252. String finalName = getOutputName(getPartition());
  253. FileSystem fs = FileSystem.get(job);
  254. final RecordWriter out =
  255. job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
  256. OutputCollector collector = new OutputCollector() {
  257. public void collect(WritableComparable key, Writable value)
  258. throws IOException {
  259. out.write(key, value);
  260. reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1);
  261. // indicate that progress update needs to be sent
  262. reporter.progress();
  263. }
  264. };
  265. // apply reduce function
  266. try {
  267. Class keyClass = job.getMapOutputKeyClass();
  268. Class valClass = job.getMapOutputValueClass();
  269. ReduceValuesIterator values = new ReduceValuesIterator(rIter,
  270. job.getOutputValueGroupingComparator(), keyClass, valClass,
  271. job, reporter);
  272. values.informReduceProgress();
  273. while (values.more()) {
  274. reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);
  275. reducer.reduce(values.getKey(), values, collector, reporter);
  276. values.nextKey();
  277. values.informReduceProgress();
  278. }
  279. //Clean up: repeated in catch block below
  280. reducer.close();
  281. out.close(reporter);
  282. //End of clean up.
  283. } catch (IOException ioe) {
  284. try {
  285. reducer.close();
  286. } catch (IOException ignored) {}
  287. try {
  288. out.close(reporter);
  289. } catch (IOException ignored) {}
  290. throw ioe;
  291. }
  292. done(umbilical);
  293. }
  294. class ReduceCopier implements MRConstants {
  295. /** Reference to the umbilical object */
  296. private TaskUmbilicalProtocol umbilical;
  297. /** Reference to the task object */
  298. /** Number of ms before timing out a copy */
  299. private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
  300. /**
  301. * our reduce task instance
  302. */
  303. private ReduceTask reduceTask;
  304. /**
  305. * the list of map outputs currently being copied
  306. */
  307. private List<MapOutputLocation> scheduledCopies;
  308. /**
  309. * the results of dispatched copy attempts
  310. */
  311. private List<CopyResult> copyResults;
  312. /**
  313. * the number of outputs to copy in parallel
  314. */
  315. private int numCopiers;
  316. /**
  317. * the maximum amount of time (less 1 minute) to wait to
  318. * contact a host after a copy from it fails. We wait for (1 min +
  319. * Random.nextInt(maxBackoff)) seconds.
  320. */
  321. private int maxBackoff;
  322. /**
  323. * busy hosts from which copies are being backed off
  324. * Map of host -> next contact time
  325. */
  326. private Map<String, Long> penaltyBox;
  327. /**
  328. * the set of unique hosts from which we are copying
  329. */
  330. private Set<String> uniqueHosts;
  331. /**
  332. * the last time we polled the job tracker
  333. */
  334. private long lastPollTime;
  335. /**
  336. * A reference to the in memory file system for writing the map outputs to.
  337. */
  338. private InMemoryFileSystem inMemFileSys;
  339. /**
  340. * A reference to the local file system for writing the map outputs to.
  341. */
  342. private FileSystem localFileSys;
  343. /**
  344. * An instance of the sorter used for doing merge
  345. */
  346. private SequenceFile.Sorter sorter;
  347. /**
  348. * A reference to the throwable object (if merge throws an exception)
  349. */
  350. private volatile Throwable mergeThrowable;
  351. /**
  352. * A flag to indicate that merge is in progress
  353. */
  354. private volatile boolean mergeInProgress = false;
  355. /**
  356. * When we accumulate mergeThreshold number of files in ram, we merge/spill
  357. */
  358. private int mergeThreshold = 500;
  359. /**
  360. * The threads for fetching the files.
  361. */
  362. private MapOutputCopier[] copiers = null;
  363. /**
  364. * The object for metrics reporting.
  365. */
  366. private ShuffleClientMetrics shuffleClientMetrics = null;
  367. /**
  368. * the minimum interval between tasktracker polls
  369. */
  370. private static final long MIN_POLL_INTERVAL = 1000;
  371. /**
  372. * the number of map output locations to poll for at one time
  373. */
  374. private int probe_sample_size = 100;
  375. /**
  376. * a list of map output locations for fetch retrials
  377. */
  378. private List<MapOutputLocation> retryFetches =
  379. new ArrayList<MapOutputLocation>();
  380. /**
  381. * The set of required map outputs
  382. */
  383. private Set <Integer> neededOutputs =
  384. Collections.synchronizedSet(new TreeSet<Integer>());
  385. /**
  386. * The set of obsolete map taskids.
  387. */
  388. private Set <String> obsoleteMapIds =
  389. Collections.synchronizedSet(new TreeSet<String>());
  390. private Random random = null;
  391. /**
  392. * the max size of the merge output from ramfs
  393. */
  394. private long ramfsMergeOutputSize;
  395. /**
  396. * This class contains the methods that should be used for metrics-reporting
  397. * the specific metrics for shuffle. This class actually reports the
  398. * metrics for the shuffle client (the ReduceTask), and hence the name
  399. * ShuffleClientMetrics.
  400. */
  401. class ShuffleClientMetrics implements Updater {
  402. private MetricsRecord shuffleMetrics = null;
  403. private int numFailedFetches = 0;
  404. private int numSuccessFetches = 0;
  405. private long numBytes = 0;
  406. private int numThreadsBusy = 0;
  407. ShuffleClientMetrics(JobConf conf) {
  408. MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  409. this.shuffleMetrics =
  410. MetricsUtil.createRecord(metricsContext, "shuffleInput");
  411. this.shuffleMetrics.setTag("user", conf.getUser());
  412. this.shuffleMetrics.setTag("jobName", conf.getJobName());
  413. this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobId());
  414. this.shuffleMetrics.setTag("taskId", getTaskId());
  415. this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
  416. metricsContext.registerUpdater(this);
  417. }
  418. public synchronized void inputBytes(long numBytes) {
  419. this.numBytes += numBytes;
  420. }
  421. public synchronized void failedFetch() {
  422. ++numFailedFetches;
  423. }
  424. public synchronized void successFetch() {
  425. ++numSuccessFetches;
  426. }
  427. public synchronized void threadBusy() {
  428. ++numThreadsBusy;
  429. }
  430. public synchronized void threadFree() {
  431. --numThreadsBusy;
  432. }
  433. public void doUpdates(MetricsContext unused) {
  434. synchronized (this) {
  435. shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
  436. shuffleMetrics.incrMetric("shuffle_failed_fetches",
  437. numFailedFetches);
  438. shuffleMetrics.incrMetric("shuffle_success_fetches",
  439. numSuccessFetches);
  440. if (numCopiers != 0) {
  441. shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
  442. 100*((float)numThreadsBusy/numCopiers));
  443. } else {
  444. shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
  445. }
  446. numBytes = 0;
  447. numSuccessFetches = 0;
  448. numFailedFetches = 0;
  449. }
  450. shuffleMetrics.update();
  451. }
  452. }
  453. /** Represents the result of an attempt to copy a map output */
  454. private class CopyResult {
  455. // the map output location against which a copy attempt was made
  456. private final MapOutputLocation loc;
  457. // the size of the file copied, -1 if the transfer failed
  458. private final long size;
  459. //a flag signifying whether a copy result is obsolete
  460. private static final int OBSOLETE = -2;
  461. CopyResult(MapOutputLocation loc, long size) {
  462. this.loc = loc;
  463. this.size = size;
  464. }
  465. public int getMapId() { return loc.getMapId(); }
  466. public boolean getSuccess() { return size >= 0; }
  467. public boolean isObsolete() {
  468. return size == OBSOLETE;
  469. }
  470. public long getSize() { return size; }
  471. public String getHost() { return loc.getHost(); }
  472. public MapOutputLocation getLocation() { return loc; }
  473. }
  474. private int extractMapIdFromPathName(Path pathname) {
  475. //all paths end with map_<id>.out
  476. String firstPathName = pathname.getName();
  477. int beginIndex = firstPathName.lastIndexOf("map_");
  478. int endIndex = firstPathName.lastIndexOf(".out");
  479. return Integer.parseInt(firstPathName.substring(beginIndex +
  480. "map_".length(), endIndex));
  481. }
  482. private int nextMapOutputCopierId = 0;
  483. /** Copies map outputs as they become available */
  484. private class MapOutputCopier extends Thread {
  485. private MapOutputLocation currentLocation = null;
  486. private int id = nextMapOutputCopierId++;
  487. private Reporter reporter;
  488. public MapOutputCopier(Reporter reporter) {
  489. setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id);
  490. LOG.debug(getName() + " created");
  491. this.reporter = reporter;
  492. }
  493. /**
  494. * Fail the current file that we are fetching
  495. * @return were we currently fetching?
  496. */
  497. public synchronized boolean fail() {
  498. if (currentLocation != null) {
  499. finish(-1);
  500. return true;
  501. } else {
  502. return false;
  503. }
  504. }
  505. /**
  506. * Get the current map output location.
  507. */
  508. public synchronized MapOutputLocation getLocation() {
  509. return currentLocation;
  510. }
  511. private synchronized void start(MapOutputLocation loc) {
  512. currentLocation = loc;
  513. }
  514. private synchronized void finish(long size) {
  515. if (currentLocation != null) {
  516. LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
  517. synchronized (copyResults) {
  518. copyResults.add(new CopyResult(currentLocation, size));
  519. copyResults.notify();
  520. }
  521. currentLocation = null;
  522. }
  523. }
  524. /** Loop forever and fetch map outputs as they become available.
  525. * The thread exits when it is interrupted by {@link ReduceTaskRunner}
  526. */
  527. public void run() {
  528. while (true) {
  529. try {
  530. MapOutputLocation loc = null;
  531. long size = -1;
  532. synchronized (scheduledCopies) {
  533. while (scheduledCopies.isEmpty()) {
  534. scheduledCopies.wait();
  535. }
  536. loc = scheduledCopies.remove(0);
  537. }
  538. try {
  539. shuffleClientMetrics.threadBusy();
  540. start(loc);
  541. size = copyOutput(loc);
  542. shuffleClientMetrics.successFetch();
  543. } catch (IOException e) {
  544. LOG.warn(reduceTask.getTaskId() + " copy failed: " +
  545. loc.getMapTaskId() + " from " + loc.getHost());
  546. LOG.warn(StringUtils.stringifyException(e));
  547. shuffleClientMetrics.failedFetch();
  548. // Reset
  549. size = -1;
  550. } finally {
  551. shuffleClientMetrics.threadFree();
  552. finish(size);
  553. }
  554. } catch (InterruptedException e) {
  555. return; // ALL DONE
  556. } catch (Throwable th) {
  557. LOG.error("Map output copy failure: " +
  558. StringUtils.stringifyException(th));
  559. }
  560. }
  561. }
  562. /** Copies a a map output from a remote host, via HTTP.
  563. * @param currentLocation the map output location to be copied
  564. * @return the path (fully qualified) of the copied file
  565. * @throws IOException if there is an error copying the file
  566. * @throws InterruptedException if the copier should give up
  567. */
  568. private long copyOutput(MapOutputLocation loc
  569. ) throws IOException, InterruptedException {
  570. // check if we still need to copy the output from this location
  571. if (!neededOutputs.contains(loc.getMapId()) ||
  572. obsoleteMapIds.contains(loc.getMapTaskId())) {
  573. return CopyResult.OBSOLETE;
  574. }
  575. String reduceId = reduceTask.getTaskId();
  576. LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
  577. " output from " + loc.getHost() + ".");
  578. // a temp filename. If this file gets created in ramfs, we're fine,
  579. // else, we will check the localFS to find a suitable final location
  580. // for this path
  581. Path filename = new Path("/" + reduceId + "/map_" +
  582. loc.getMapId() + ".out");
  583. // a working filename that will be unique to this attempt
  584. Path tmpFilename = new Path(filename + "-" + id);
  585. // this copies the map output file
  586. tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleClientMetrics,
  587. tmpFilename, lDirAlloc,
  588. conf, reduceTask.getPartition(),
  589. STALLED_COPY_TIMEOUT, reporter);
  590. if (!neededOutputs.contains(loc.getMapId())) {
  591. if (tmpFilename != null) {
  592. FileSystem fs = tmpFilename.getFileSystem(conf);
  593. fs.delete(tmpFilename);
  594. }
  595. return CopyResult.OBSOLETE;
  596. }
  597. if (tmpFilename == null)
  598. throw new IOException("File " + filename + "-" + id +
  599. " not created");
  600. long bytes = -1;
  601. // lock the ReduceTask while we do the rename
  602. synchronized (ReduceTask.this) {
  603. // This file could have been created in the inmemory
  604. // fs or the localfs. So need to get the filesystem owning the path.
  605. FileSystem fs = tmpFilename.getFileSystem(conf);
  606. if (!neededOutputs.contains(loc.getMapId())) {
  607. fs.delete(tmpFilename);
  608. return CopyResult.OBSOLETE;
  609. }
  610. bytes = fs.getLength(tmpFilename);
  611. //resolve the final filename against the directory where the tmpFile
  612. //got created
  613. filename = new Path(tmpFilename.getParent(), filename.getName());
  614. // if we can't rename the file, something is broken (and IOException
  615. // will be thrown).
  616. if (!fs.rename(tmpFilename, filename)) {
  617. fs.delete(tmpFilename);
  618. bytes = -1;
  619. throw new IOException("failure to rename map output " +
  620. tmpFilename);
  621. }
  622. LOG.info(reduceId + " done copying " + loc.getMapTaskId() +
  623. " output from " + loc.getHost() + ".");
  624. //Create a thread to do merges. Synchronize access/update to
  625. //mergeInProgress
  626. if (!mergeInProgress &&
  627. (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE ||
  628. (mergeThreshold > 0 &&
  629. inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >=
  630. mergeThreshold))&&
  631. mergeThrowable == null) {
  632. LOG.info(reduceId + " InMemoryFileSystem " +
  633. inMemFileSys.getUri().toString() +
  634. " is " + inMemFileSys.getPercentUsed() +
  635. " full. Triggering merge");
  636. InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
  637. (LocalFileSystem)localFileSys, sorter);
  638. m.setName("Thread for merging in memory files");
  639. m.setDaemon(true);
  640. mergeInProgress = true;
  641. m.start();
  642. }
  643. neededOutputs.remove(loc.getMapId());
  644. }
  645. return bytes;
  646. }
  647. }
  648. private void configureClasspath(JobConf conf)
  649. throws IOException {
  650. // get the task and the current classloader which will become the parent
  651. Task task = ReduceTask.this;
  652. ClassLoader parent = conf.getClassLoader();
  653. // get the work directory which holds the elements we are dynamically
  654. // adding to the classpath
  655. File workDir = new File(task.getJobFile()).getParentFile();
  656. File jobCacheDir = new File(workDir.getParent(), "work");
  657. ArrayList<URL> urllist = new ArrayList<URL>();
  658. // add the jars and directories to the classpath
  659. String jar = conf.getJar();
  660. if (jar != null) {
  661. File[] libs = new File(jobCacheDir, "lib").listFiles();
  662. if (libs != null) {
  663. for (int i = 0; i < libs.length; i++) {
  664. urllist.add(libs[i].toURL());
  665. }
  666. }
  667. urllist.add(new File(jobCacheDir, "classes").toURL());
  668. urllist.add(jobCacheDir.toURL());
  669. }
  670. urllist.add(workDir.toURL());
  671. // create a new classloader with the old classloader as its parent
  672. // then set that classloader as the one used by the current jobconf
  673. URL[] urls = urllist.toArray(new URL[urllist.size()]);
  674. URLClassLoader loader = new URLClassLoader(urls, parent);
  675. conf.setClassLoader(loader);
  676. }
  677. public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf)
  678. throws IOException {
  679. configureClasspath(conf);
  680. this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
  681. this.umbilical = umbilical;
  682. this.reduceTask = ReduceTask.this;
  683. this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
  684. this.copyResults = new ArrayList<CopyResult>(100);
  685. this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
  686. this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
  687. this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
  688. //we want to distinguish inmem fs instances for different reduces. Hence,
  689. //append a unique string in the uri for the inmem fs name
  690. URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
  691. inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
  692. LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
  693. + uri);
  694. ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE *
  695. inMemFileSys.getFSSize());
  696. localFileSys = FileSystem.getLocal(conf);
  697. //create an instance of the sorter
  698. sorter =
  699. new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(),
  700. conf.getMapOutputValueClass(), conf);
  701. sorter.setProgressable(getReporter(umbilical));
  702. // hosts -> next contact time
  703. this.penaltyBox = new Hashtable<String, Long>();
  704. // hostnames
  705. this.uniqueHosts = new HashSet<String>();
  706. this.lastPollTime = 0;
  707. // Seed the random number generator with a reasonably globally unique seed
  708. long randomSeed = System.nanoTime() +
  709. (long)Math.pow(this.reduceTask.getPartition(),
  710. (this.reduceTask.getPartition()%10)
  711. );
  712. this.random = new Random(randomSeed);
  713. }
  714. public boolean fetchOutputs() throws IOException {
  715. final int numOutputs = reduceTask.getNumMaps();
  716. List<MapOutputLocation> knownOutputs =
  717. new ArrayList<MapOutputLocation>(numCopiers);
  718. int numInFlight = 0, numCopied = 0;
  719. int lowThreshold = numCopiers*2;
  720. long bytesTransferred = 0;
  721. DecimalFormat mbpsFormat = new DecimalFormat("0.00");
  722. Random backoff = new Random();
  723. final Progress copyPhase =
  724. reduceTask.getProgress().phase();
  725. //tweak the probe sample size (make it a function of numCopiers)
  726. probe_sample_size = Math.max(numCopiers*5, 50);
  727. for (int i = 0; i < numOutputs; i++) {
  728. neededOutputs.add(i);
  729. copyPhase.addPhase(); // add sub-phase per file
  730. }
  731. copiers = new MapOutputCopier[numCopiers];
  732. Reporter reporter = getReporter(umbilical);
  733. // start all the copying threads
  734. for (int i=0; i < copiers.length; i++) {
  735. copiers[i] = new MapOutputCopier(reporter);
  736. copiers[i].start();
  737. }
  738. // start the clock for bandwidth measurement
  739. long startTime = System.currentTimeMillis();
  740. long currentTime = startTime;
  741. IntWritable fromEventId = new IntWritable(0);
  742. try {
  743. // loop until we get all required outputs
  744. while (!neededOutputs.isEmpty() && mergeThrowable == null) {
  745. LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
  746. " map output(s)");
  747. try {
  748. // Put the hash entries for the failed fetches. Entries here
  749. // might be replaced by (mapId) hashkeys from new successful
  750. // Map executions, if the fetch failures were due to lost tasks.
  751. // The replacements, if at all, will happen when we query the
  752. // tasktracker and put the mapId hashkeys with new
  753. // MapOutputLocations as values
  754. knownOutputs.addAll(retryFetches);
  755. // The call getMapCompletionEvents will update fromEventId to
  756. // used for the next call to getMapCompletionEvents
  757. int currentNumKnownMaps = knownOutputs.size();
  758. int currentNumObsoleteMapIds = obsoleteMapIds.size();
  759. getMapCompletionEvents(fromEventId, knownOutputs);
  760. LOG.info(reduceTask.getTaskId() + ": " +
  761. "Got " + (knownOutputs.size()-currentNumKnownMaps) +
  762. " new map-outputs & " +
  763. (obsoleteMapIds.size()-currentNumObsoleteMapIds) +
  764. " obsolete map-outputs from tasktracker and " +
  765. retryFetches.size() + " map-outputs from previous failures"
  766. );
  767. // clear the "failed" fetches hashmap
  768. retryFetches.clear();
  769. }
  770. catch (IOException ie) {
  771. LOG.warn(reduceTask.getTaskId() +
  772. " Problem locating map outputs: " +
  773. StringUtils.stringifyException(ie));
  774. }
  775. // now walk through the cache and schedule what we can
  776. int numKnown = knownOutputs.size(), numScheduled = 0;
  777. int numSlow = 0, numDups = 0;
  778. LOG.info(reduceTask.getTaskId() + " Got " + numKnown +
  779. " known map output location(s); scheduling...");
  780. synchronized (scheduledCopies) {
  781. // Randomize the map output locations to prevent
  782. // all reduce-tasks swamping the same tasktracker
  783. Collections.shuffle(knownOutputs, this.random);
  784. Iterator locIt = knownOutputs.iterator();
  785. currentTime = System.currentTimeMillis();
  786. while (locIt.hasNext()) {
  787. MapOutputLocation loc = (MapOutputLocation)locIt.next();
  788. // Do not schedule fetches from OBSOLETE maps
  789. if (obsoleteMapIds.contains(loc.getMapTaskId())) {
  790. locIt.remove();
  791. continue;
  792. }
  793. Long penaltyEnd = penaltyBox.get(loc.getHost());
  794. boolean penalized = false, duplicate = false;
  795. if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
  796. penalized = true; numSlow++;
  797. }
  798. if (uniqueHosts.contains(loc.getHost())) {
  799. duplicate = true; numDups++;
  800. }
  801. if (!penalized && !duplicate) {
  802. uniqueHosts.add(loc.getHost());
  803. scheduledCopies.add(loc);
  804. locIt.remove(); // remove from knownOutputs
  805. numInFlight++; numScheduled++;
  806. }
  807. }
  808. scheduledCopies.notifyAll();
  809. }
  810. LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
  811. " of " + numKnown + " known outputs (" + numSlow +
  812. " slow hosts and " + numDups + " dup hosts)");
  813. // if we have no copies in flight and we can't schedule anything
  814. // new, just wait for a bit
  815. try {
  816. if (numInFlight == 0 && numScheduled == 0) {
  817. // we should indicate progress as we don't want TT to think
  818. // we're stuck and kill us
  819. reporter.progress();
  820. Thread.sleep(5000);
  821. }
  822. } catch (InterruptedException e) { } // IGNORE
  823. while (numInFlight > 0 && mergeThrowable == null) {
  824. LOG.debug(reduceTask.getTaskId() + " numInFlight = " +
  825. numInFlight);
  826. CopyResult cr = getCopyResult();
  827. if (cr != null) {
  828. if (cr.getSuccess()) { // a successful copy
  829. numCopied++;
  830. bytesTransferred += cr.getSize();
  831. long secsSinceStart =
  832. (System.currentTimeMillis()-startTime)/1000+1;
  833. float mbs = ((float)bytesTransferred)/(1024*1024);
  834. float transferRate = mbs/secsSinceStart;
  835. copyPhase.startNextPhase();
  836. copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
  837. + " at " +
  838. mbpsFormat.format(transferRate) + " MB/s)");
  839. } else if (cr.isObsolete()) {
  840. //ignore
  841. LOG.info(reduceTask.getTaskId() +
  842. " Ignoring obsolete copy result for Map Task: " +
  843. cr.getLocation().getMapTaskId() + " from host: " +
  844. cr.getHost());
  845. } else {
  846. retryFetches.add(cr.getLocation());
  847. // wait a random amount of time for next contact
  848. currentTime = System.currentTimeMillis();
  849. long nextContact = currentTime + 60 * 1000 +
  850. backoff.nextInt(maxBackoff*1000);
  851. penaltyBox.put(cr.getHost(), nextContact);
  852. LOG.warn(reduceTask.getTaskId() + " adding host " +
  853. cr.getHost() + " to penalty box, next contact in " +
  854. ((nextContact-currentTime)/1000) + " seconds");
  855. // other outputs from the failed host may be present in the
  856. // knownOutputs cache, purge them. This is important in case
  857. // the failure is due to a lost tasktracker (causes many
  858. // unnecessary backoffs). If not, we only take a small hit
  859. // polling the tasktracker a few more times
  860. Iterator locIt = knownOutputs.iterator();
  861. while (locIt.hasNext()) {
  862. MapOutputLocation loc = (MapOutputLocation)locIt.next();
  863. if (cr.getHost().equals(loc.getHost())) {
  864. retryFetches.add(loc);
  865. locIt.remove();
  866. }
  867. }
  868. }
  869. uniqueHosts.remove(cr.getHost());
  870. numInFlight--;
  871. }
  872. boolean busy = true;
  873. // ensure we have enough to keep us busy
  874. if (numInFlight < lowThreshold && (numOutputs-numCopied) >
  875. probe_sample_size) {
  876. busy = false;
  877. }
  878. //Check whether we have more CopyResult to check. If there is none,
  879. //and we are not busy enough, break
  880. synchronized (copyResults) {
  881. if (copyResults.size() == 0 && !busy) {
  882. break;
  883. }
  884. }
  885. }
  886. }
  887. // all done, inform the copiers to exit
  888. synchronized (copiers) {
  889. synchronized (scheduledCopies) {
  890. for (int i=0; i < copiers.length; i++) {
  891. copiers[i].interrupt();
  892. copiers[i] = null;
  893. }
  894. }
  895. }
  896. //Do a merge of in-memory files (if there are any)
  897. if (mergeThrowable == null) {
  898. try {
  899. //wait for an ongoing merge (if it is in flight) to complete
  900. while (mergeInProgress) {
  901. Thread.sleep(200);
  902. }
  903. LOG.info(reduceTask.getTaskId() +
  904. " Copying of all map outputs complete. " +
  905. "Initiating the last merge on the remaining files in " +
  906. inMemFileSys.getUri());
  907. if (mergeThrowable != null) {
  908. //this could happen if the merge that
  909. //was in progress threw an exception
  910. throw mergeThrowable;
  911. }
  912. //initiate merge
  913. Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
  914. if (inMemClosedFiles.length == 0) {
  915. LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
  916. inMemFileSys.getUri());
  917. return neededOutputs.isEmpty();
  918. }
  919. //name this output file same as the name of the first file that is
  920. //there in the current list of inmem files (this is guaranteed to be
  921. //absent on the disk currently. So we don't overwrite a prev.
  922. //created spill). Also we need to create the output file now since
  923. //it is not guaranteed that this file will be present after merge
  924. //is called (we delete empty sequence files as soon as we see them
  925. //in the merge method)
  926. int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
  927. Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
  928. reduceTask.getTaskId(), ramfsMergeOutputSize);
  929. SequenceFile.Writer writer = sorter.cloneFileAttributes(
  930. inMemFileSys.makeQualified(inMemClosedFiles[0]),
  931. localFileSys.makeQualified(outputPath), null);
  932. SequenceFile.Sorter.RawKeyValueIterator rIter = null;
  933. try {
  934. rIter = sorter.merge(inMemClosedFiles, true,
  935. inMemClosedFiles.length,
  936. new Path(reduceTask.getTaskId()));
  937. } catch (Exception e) {
  938. //make sure that we delete the ondisk file that we created earlier
  939. //when we invoked cloneFileAttributes
  940. writer.close();
  941. localFileSys.delete(inMemClosedFiles[0]);
  942. throw new IOException (StringUtils.stringifyException(e));
  943. }
  944. sorter.writeFile(rIter, writer);
  945. writer.close();
  946. LOG.info(reduceTask.getTaskId() +
  947. " Merge of the " +inMemClosedFiles.length +
  948. " files in InMemoryFileSystem complete." +
  949. " Local file is " + outputPath);
  950. } catch (Throwable t) {
  951. LOG.warn(reduceTask.getTaskId() +
  952. " Final merge of the inmemory files threw an exception: " +
  953. StringUtils.stringifyException(t));
  954. return false;
  955. }
  956. }
  957. return mergeThrowable == null && neededOutputs.isEmpty();
  958. } finally {
  959. inMemFileSys.close();
  960. }
  961. }
  962. private CopyResult getCopyResult() {
  963. synchronized (copyResults) {
  964. while (copyResults.isEmpty()) {
  965. try {
  966. copyResults.wait();
  967. } catch (InterruptedException e) { }
  968. }
  969. if (copyResults.isEmpty()) {
  970. return null;
  971. } else {
  972. return copyResults.remove(0);
  973. }
  974. }
  975. }
  976. /**
  977. * Queries the task tracker for a set of map-completion events from
  978. * a given event ID.
  979. * @param fromEventId the first event ID we want to start from, this is
  980. * modified by the call to this method
  981. * @param jobClient the job tracker
  982. * @return a set of locations to copy outputs from
  983. * @throws IOException
  984. */
  985. private void getMapCompletionEvents(IntWritable fromEventId,
  986. List<MapOutputLocation> knownOutputs)
  987. throws IOException {
  988. long currentTime = System.currentTimeMillis();
  989. long pollTime = lastPollTime + MIN_POLL_INTERVAL;
  990. while (currentTime < pollTime) {
  991. try {
  992. Thread.sleep(pollTime-currentTime);
  993. } catch (InterruptedException ie) { } // IGNORE
  994. currentTime = System.currentTimeMillis();
  995. }
  996. TaskCompletionEvent events[] =
  997. umbilical.getMapCompletionEvents(reduceTask.getJobId(),
  998. fromEventId.get(), probe_sample_size);
  999. // Note the last successful poll time-stamp
  1000. lastPollTime = currentTime;
  1001. // Update the last seen event ID
  1002. fromEventId.set(fromEventId.get() + events.length);
  1003. // Process the TaskCompletionEvents:
  1004. // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
  1005. // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching
  1006. // from those maps.
  1007. // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
  1008. // outputs at all.
  1009. for (TaskCompletionEvent event : events) {
  1010. switch (event.getTaskStatus()) {
  1011. case SUCCEEDED:
  1012. {
  1013. URI u = URI.create(event.getTaskTrackerHttp());
  1014. String host = u.getHost();
  1015. int port = u.getPort();
  1016. String taskId = event.getTaskId();
  1017. int mId = event.idWithinJob();
  1018. knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
  1019. }
  1020. break;
  1021. case FAILED:
  1022. case KILLED:
  1023. case OBSOLETE:
  1024. {
  1025. obsoleteMapIds.add(event.getTaskId());
  1026. LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
  1027. " map-task: '" + event.getTaskId() + "'");
  1028. }
  1029. break;
  1030. case TIPFAILED:
  1031. {
  1032. neededOutputs.remove(event.idWithinJob());
  1033. LOG.info("Ignoring output of failed map TIP: '" +
  1034. event.getTaskId() + "'");
  1035. }
  1036. break;
  1037. }
  1038. }
  1039. }
  1040. private class InMemFSMergeThread extends Thread {
  1041. private InMemoryFileSystem inMemFileSys;
  1042. private LocalFileSystem localFileSys;
  1043. private SequenceFile.Sorter sorter;
  1044. public InMemFSMergeThread(InMemoryFileSystem inMemFileSys,
  1045. LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
  1046. this.inMemFileSys = inMemFileSys;
  1047. this.localFileSys = localFileSys;
  1048. this.sorter = sorter;
  1049. }
  1050. public void run() {
  1051. LOG.info(reduceTask.getTaskId() + " Thread started: " + getName());
  1052. try {
  1053. //initiate merge
  1054. Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
  1055. //Note that the above Path[] could be of length 0 if all copies are
  1056. //in flight. So we make sure that we have some 'closed' map
  1057. //output files to merge to get the benefit of in-memory merge
  1058. if (inMemClosedFiles.length >=
  1059. (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
  1060. //name this output file same as the name of the first file that is
  1061. //there in the current list of inmem files (this is guaranteed to
  1062. //be absent on the disk currently. So we don't overwrite a prev.
  1063. //created spill). Also we need to create the output file now since
  1064. //it is not guaranteed that this file will be present after merge
  1065. //is called (we delete empty sequence files as soon as we see them
  1066. //in the merge method)
  1067. //figure out the mapId
  1068. int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
  1069. Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
  1070. reduceTask.getTaskId(), ramfsMergeOutputSize);
  1071. SequenceFile.Writer writer = sorter.cloneFileAttributes(
  1072. inMemFileSys.makeQualified(inMemClosedFiles[0]),
  1073. localFileSys.makeQualified(outputPath), null);
  1074. SequenceFile.Sorter.RawKeyValueIterator rIter;
  1075. try {
  1076. rIter = sorter.merge(inMemClosedFiles, true,
  1077. inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
  1078. } catch (Exception e) {
  1079. //make sure that we delete the ondisk file that we created
  1080. //earlier when we invoked cloneFileAttributes
  1081. writer.close();
  1082. localFileSys.delete(outputPath);
  1083. throw new IOException (StringUtils.stringifyException(e));
  1084. }
  1085. sorter.writeFile(rIter, writer);
  1086. writer.close();
  1087. LOG.info(reduceTask.getTaskId() +
  1088. " Merge of the " +inMemClosedFiles.length +
  1089. " files in InMemoryFileSystem complete." +
  1090. " Local file is " + outputPath);
  1091. }
  1092. else {
  1093. LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
  1094. inMemFileSys.getUri());
  1095. }
  1096. } catch (Throwable t) {
  1097. LOG.warn(reduceTask.getTaskId() +
  1098. " Intermediate Merge of the inmemory files threw an exception: "
  1099. + StringUtils.stringifyException(t));
  1100. ReduceCopier.this.mergeThrowable = t;
  1101. }
  1102. finally {
  1103. mergeInProgress = false;
  1104. }
  1105. }
  1106. }
  1107. final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
  1108. public boolean accept(Path file) {
  1109. return file.toString().endsWith(".out");
  1110. }
  1111. };
  1112. }
  1113. }