|
@@ -21,11 +21,12 @@ import java.io.DataInputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
|
|
+import java.net.HttpURLConnection;
|
|
import java.net.MalformedURLException;
|
|
import java.net.MalformedURLException;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
-import java.net.HttpURLConnection;
|
|
|
|
import java.net.URLConnection;
|
|
import java.net.URLConnection;
|
|
import java.security.GeneralSecurityException;
|
|
import java.security.GeneralSecurityException;
|
|
|
|
+import java.util.Arrays;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
@@ -53,7 +54,8 @@ import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
|
|
|
-@SuppressWarnings({"deprecation"})
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+
|
|
class Fetcher<K,V> extends Thread {
|
|
class Fetcher<K,V> extends Thread {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(Fetcher.class);
|
|
private static final Log LOG = LogFactory.getLog(Fetcher.class);
|
|
@@ -199,6 +201,7 @@ class Fetcher<K,V> extends Thread {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @VisibleForTesting
|
|
protected HttpURLConnection openConnection(URL url) throws IOException {
|
|
protected HttpURLConnection openConnection(URL url) throws IOException {
|
|
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
|
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
|
if (sslShuffle) {
|
|
if (sslShuffle) {
|
|
@@ -209,17 +212,18 @@ class Fetcher<K,V> extends Thread {
|
|
throw new IOException(ex);
|
|
throw new IOException(ex);
|
|
}
|
|
}
|
|
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
|
|
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
|
|
- }
|
|
|
|
- return conn;
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ return conn;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* The crux of the matter...
|
|
* The crux of the matter...
|
|
*
|
|
*
|
|
* @param host {@link MapHost} from which we need to
|
|
* @param host {@link MapHost} from which we need to
|
|
* shuffle available map-outputs.
|
|
* shuffle available map-outputs.
|
|
*/
|
|
*/
|
|
- private void copyFromHost(MapHost host) throws IOException {
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ protected void copyFromHost(MapHost host) throws IOException {
|
|
// Get completed maps on 'host'
|
|
// Get completed maps on 'host'
|
|
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
|
|
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
|
|
|
|
|
|
@@ -229,9 +233,9 @@ class Fetcher<K,V> extends Thread {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- LOG.debug("Fetcher " + id + " going to fetch from " + host);
|
|
|
|
- for (TaskAttemptID tmp: maps) {
|
|
|
|
- LOG.debug(tmp);
|
|
|
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
|
|
|
|
+ + maps);
|
|
}
|
|
}
|
|
|
|
|
|
// List of maps to be fetched yet
|
|
// List of maps to be fetched yet
|
|
@@ -304,17 +308,25 @@ class Fetcher<K,V> extends Thread {
|
|
|
|
|
|
try {
|
|
try {
|
|
// Loop through available map-outputs and fetch them
|
|
// Loop through available map-outputs and fetch them
|
|
- // On any error, good becomes false and we exit after putting back
|
|
|
|
- // the remaining maps to the yet_to_be_fetched list
|
|
|
|
- boolean good = true;
|
|
|
|
- while (!remaining.isEmpty() && good) {
|
|
|
|
- good = copyMapOutput(host, input, remaining);
|
|
|
|
|
|
+ // On any error, faildTasks is not null and we exit
|
|
|
|
+ // after putting back the remaining maps to the
|
|
|
|
+ // yet_to_be_fetched list and marking the failed tasks.
|
|
|
|
+ TaskAttemptID[] failedTasks = null;
|
|
|
|
+ while (!remaining.isEmpty() && failedTasks == null) {
|
|
|
|
+ failedTasks = copyMapOutput(host, input, remaining);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if(failedTasks != null && failedTasks.length > 0) {
|
|
|
|
+ LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
|
|
|
|
+ for(TaskAttemptID left: failedTasks) {
|
|
|
|
+ scheduler.copyFailed(left, host, true);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
IOUtils.cleanup(LOG, input);
|
|
IOUtils.cleanup(LOG, input);
|
|
|
|
|
|
// Sanity check
|
|
// Sanity check
|
|
- if (good && !remaining.isEmpty()) {
|
|
|
|
|
|
+ if (failedTasks == null && !remaining.isEmpty()) {
|
|
throw new IOException("server didn't return all expected map outputs: "
|
|
throw new IOException("server didn't return all expected map outputs: "
|
|
+ remaining.size() + " left.");
|
|
+ remaining.size() + " left.");
|
|
}
|
|
}
|
|
@@ -323,10 +335,11 @@ class Fetcher<K,V> extends Thread {
|
|
scheduler.putBackKnownMapOutput(host, left);
|
|
scheduler.putBackKnownMapOutput(host, left);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
- private boolean copyMapOutput(MapHost host,
|
|
|
|
|
|
+ private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0];
|
|
|
|
+
|
|
|
|
+ private TaskAttemptID[] copyMapOutput(MapHost host,
|
|
DataInputStream input,
|
|
DataInputStream input,
|
|
Set<TaskAttemptID> remaining) {
|
|
Set<TaskAttemptID> remaining) {
|
|
MapOutput<K,V> mapOutput = null;
|
|
MapOutput<K,V> mapOutput = null;
|
|
@@ -348,18 +361,21 @@ class Fetcher<K,V> extends Thread {
|
|
} catch (IllegalArgumentException e) {
|
|
} catch (IllegalArgumentException e) {
|
|
badIdErrs.increment(1);
|
|
badIdErrs.increment(1);
|
|
LOG.warn("Invalid map id ", e);
|
|
LOG.warn("Invalid map id ", e);
|
|
- return false;
|
|
|
|
|
|
+ //Don't know which one was bad, so consider all of them as bad
|
|
|
|
+ return remaining.toArray(new TaskAttemptID[remaining.size()]);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
// Do some basic sanity verification
|
|
// Do some basic sanity verification
|
|
if (!verifySanity(compressedLength, decompressedLength, forReduce,
|
|
if (!verifySanity(compressedLength, decompressedLength, forReduce,
|
|
remaining, mapId)) {
|
|
remaining, mapId)) {
|
|
- return false;
|
|
|
|
|
|
+ return new TaskAttemptID[] {mapId};
|
|
}
|
|
}
|
|
|
|
|
|
- LOG.debug("header: " + mapId + ", len: " + compressedLength +
|
|
|
|
- ", decomp len: " + decompressedLength);
|
|
|
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("header: " + mapId + ", len: " + compressedLength +
|
|
|
|
+ ", decomp len: " + decompressedLength);
|
|
|
|
+ }
|
|
|
|
|
|
// Get the location for the map output - either in-memory or on-disk
|
|
// Get the location for the map output - either in-memory or on-disk
|
|
mapOutput = merger.reserve(mapId, decompressedLength, id);
|
|
mapOutput = merger.reserve(mapId, decompressedLength, id);
|
|
@@ -367,7 +383,8 @@ class Fetcher<K,V> extends Thread {
|
|
// Check if we can shuffle *now* ...
|
|
// Check if we can shuffle *now* ...
|
|
if (mapOutput.getType() == Type.WAIT) {
|
|
if (mapOutput.getType() == Type.WAIT) {
|
|
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
|
|
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
|
|
- return false;
|
|
|
|
|
|
+ //Not an error but wait to process data.
|
|
|
|
+ return EMPTY_ATTEMPT_ID_ARRAY;
|
|
}
|
|
}
|
|
|
|
|
|
// Go!
|
|
// Go!
|
|
@@ -389,24 +406,27 @@ class Fetcher<K,V> extends Thread {
|
|
// Note successful shuffle
|
|
// Note successful shuffle
|
|
remaining.remove(mapId);
|
|
remaining.remove(mapId);
|
|
metrics.successFetch();
|
|
metrics.successFetch();
|
|
- return true;
|
|
|
|
|
|
+ return null;
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
ioErrs.increment(1);
|
|
ioErrs.increment(1);
|
|
if (mapId == null || mapOutput == null) {
|
|
if (mapId == null || mapOutput == null) {
|
|
LOG.info("fetcher#" + id + " failed to read map header" +
|
|
LOG.info("fetcher#" + id + " failed to read map header" +
|
|
mapId + " decomp: " +
|
|
mapId + " decomp: " +
|
|
decompressedLength + ", " + compressedLength, ioe);
|
|
decompressedLength + ", " + compressedLength, ioe);
|
|
- return false;
|
|
|
|
|
|
+ if(mapId == null) {
|
|
|
|
+ return remaining.toArray(new TaskAttemptID[remaining.size()]);
|
|
|
|
+ } else {
|
|
|
|
+ return new TaskAttemptID[] {mapId};
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- LOG.info("Failed to shuffle output of " + mapId +
|
|
|
|
|
|
+ LOG.warn("Failed to shuffle output of " + mapId +
|
|
" from " + host.getHostName(), ioe);
|
|
" from " + host.getHostName(), ioe);
|
|
|
|
|
|
// Inform the shuffle-scheduler
|
|
// Inform the shuffle-scheduler
|
|
mapOutput.abort();
|
|
mapOutput.abort();
|
|
- scheduler.copyFailed(mapId, host, true);
|
|
|
|
metrics.failedFetch();
|
|
metrics.failedFetch();
|
|
- return false;
|
|
|
|
|
|
+ return new TaskAttemptID[] {mapId};
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|