|
@@ -887,9 +887,12 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
}
|
|
|
// Check whether the shuffle version is compatible
|
|
|
if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
|
|
|
- request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
|
|
|
+ request.headers() != null ?
|
|
|
+ request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null)
|
|
|
|| !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
|
|
|
- request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
|
|
|
+ request.headers() != null ?
|
|
|
+ request.headers()
|
|
|
+ .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) {
|
|
|
sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
|
|
|
}
|
|
|
final Map<String,List<String>> q =
|
|
@@ -1117,12 +1120,12 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Setting connection close header...");
|
|
|
}
|
|
|
- response.setHeader(HttpHeaders.CONNECTION, CONNECTION_CLOSE);
|
|
|
+ response.headers().set(HttpHeaders.CONNECTION, CONNECTION_CLOSE);
|
|
|
} else {
|
|
|
- response.setHeader(HttpHeaders.CONTENT_LENGTH,
|
|
|
+ response.headers().set(HttpHeaders.CONTENT_LENGTH,
|
|
|
String.valueOf(contentLength));
|
|
|
- response.setHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
|
|
|
- response.setHeader(HttpHeaders.KEEP_ALIVE, "timeout="
|
|
|
+ response.headers().set(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
|
|
|
+ response.headers().set(HttpHeaders.KEEP_ALIVE, "timeout="
|
|
|
+ connectionKeepAliveTimeOut);
|
|
|
LOG.info("Content Length in shuffle : " + contentLength);
|
|
|
}
|
|
@@ -1150,7 +1153,7 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
|
|
|
// hash from the fetcher
|
|
|
String urlHashStr =
|
|
|
- request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
|
|
|
+ request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
|
|
|
if (urlHashStr == null) {
|
|
|
LOG.info("Missing header hash for " + appid);
|
|
|
throw new IOException("fetcher cannot be authenticated");
|
|
@@ -1166,11 +1169,12 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
String reply =
|
|
|
SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
|
|
|
tokenSecret);
|
|
|
- response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
|
|
|
+ response.headers().set(
|
|
|
+ SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
|
|
|
// Put shuffle version into http header
|
|
|
- response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
|
|
|
+ response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
|
|
|
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
|
|
- response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
|
|
|
+ response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
|
|
|
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
int len = reply.length();
|
|
@@ -1236,11 +1240,11 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
protected void sendError(ChannelHandlerContext ctx, String message,
|
|
|
HttpResponseStatus status) {
|
|
|
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
|
|
|
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
|
|
|
+ response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
|
|
|
// Put shuffle version into http header
|
|
|
- response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
|
|
|
+ response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
|
|
|
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
|
|
- response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
|
|
|
+ response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
|
|
|
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
|
|
response.setContent(
|
|
|
ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
|