|
@@ -1385,27 +1385,8 @@ class ReduceTask extends Task {
|
|
|
// Connect
|
|
|
URL url = mapOutputLoc.getOutputLocation();
|
|
|
URLConnection connection = url.openConnection();
|
|
|
-
|
|
|
- // generate hash of the url
|
|
|
- String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
|
|
|
- String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
|
|
|
-
|
|
|
- // put url hash into http header
|
|
|
- connection.addRequestProperty(
|
|
|
- SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
|
|
|
|
|
|
- InputStream input = getInputStream(connection, shuffleConnectionTimeout,
|
|
|
- shuffleReadTimeout);
|
|
|
-
|
|
|
- // get the replyHash which is HMac of the encHash we sent to the server
|
|
|
- String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
|
|
|
- if(replyHash==null) {
|
|
|
- throw new IOException("security validation of TT Map output failed");
|
|
|
- }
|
|
|
- LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
|
|
|
- // verify that replyHash is HMac of encHash
|
|
|
- SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
|
|
|
- LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
|
|
|
+ InputStream input = setupSecureConnection(mapOutputLoc, connection);
|
|
|
|
|
|
// Validate header from map output
|
|
|
TaskAttemptID mapId = null;
|
|
@@ -1477,6 +1458,38 @@ class ReduceTask extends Task {
|
|
|
|
|
|
return mapOutput;
|
|
|
}
|
|
|
+
|
|
|
+ private InputStream setupSecureConnection(MapOutputLocation mapOutputLoc,
|
|
|
+ URLConnection connection) throws IOException {
|
|
|
+
|
|
|
+ // generate hash of the url
|
|
|
+ String msgToEncode =
|
|
|
+ SecureShuffleUtils.buildMsgFrom(connection.getURL());
|
|
|
+ String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
|
|
|
+ jobTokenSecret);
|
|
|
+
|
|
|
+ // put url hash into http header
|
|
|
+ connection.setRequestProperty(
|
|
|
+ SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
|
|
|
+
|
|
|
+ InputStream input = getInputStream(connection, shuffleConnectionTimeout,
|
|
|
+ shuffleReadTimeout);
|
|
|
+
|
|
|
+ // get the replyHash which is HMac of the encHash we sent to the server
|
|
|
+ String replyHash = connection.getHeaderField(
|
|
|
+ SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
|
|
|
+ if(replyHash==null) {
|
|
|
+ throw new IOException("security validation of TT Map output failed");
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="
|
|
|
+ +replyHash);
|
|
|
+ // verify that replyHash is HMac of encHash
|
|
|
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug("for url="+msgToEncode+" sent hash and receievd reply");
|
|
|
+ return input;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* The connection establishment is attempted multiple times and is given up
|
|
@@ -1544,8 +1557,7 @@ class ReduceTask extends Task {
|
|
|
// Reconnect
|
|
|
try {
|
|
|
connection = mapOutputLoc.getOutputLocation().openConnection();
|
|
|
- input = getInputStream(connection, shuffleConnectionTimeout,
|
|
|
- shuffleReadTimeout);
|
|
|
+ input = setupSecureConnection(mapOutputLoc, connection);
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.info("Failed reopen connection to fetch map-output from " +
|
|
|
mapOutputLoc.getHost());
|