Ver código fonte

MAPREDUCE-7431. ShuffleHandler refactor and fix after Netty4 upgrade. (#5311)

Tamas Domok 2 anos atrás
pai
commit
151b71d7af
11 arquivos alterados com 1899 adições e 2127 exclusões
  1. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
  2. 715 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java
  3. 140 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandlerContext.java
  4. 74 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelInitializer.java
  5. 119 913
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
  6. 562 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java
  7. 30 1212
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
  8. 172 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java
  9. 27 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/cert.pem
  10. 52 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/key.pem
  11. 2 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml

@@ -55,6 +55,12 @@
       <groupId>${leveldbjni.group}</groupId>
       <artifactId>leveldbjni-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <version>1.1.2</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

+ 715 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java

@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&amp;reduce=0&amp;
+ *     map=attempt_1111111111111_0001_m_000001_0,
+ *     attempt_1111111111111_0002_m_000002_0,
+ *     attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30 |111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00                      |0001_0...       |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61                                  |aaaaa           |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30 |111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00                      |0002_0...       |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62                                  |bbbbb           |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30 |111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00                      |0003_0...       |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63                                  |ccccc           |
+ * +--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+  private final ShuffleChannelHandlerContext handlerCtx;
+
+  ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+    handlerCtx = ctx;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx)
+      throws Exception {
+    LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+    int numConnections = handlerCtx.activeConnections.incrementAndGet();
+    if ((handlerCtx.maxShuffleConnections > 0) &&
+        (numConnections > handlerCtx.maxShuffleConnections)) {
+      LOG.info(String.format("Current number of shuffle connections (%d) is " +
+              "greater than the max allowed shuffle connections (%d)",
+          handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+      Map<String, String> headers = new HashMap<>(1);
+      // notify fetchers to backoff for a while before closing the connection
+      // if the shuffle connection limit is hit. Fetchers are expected to
+      // handle this notification gracefully, that is, not treating this as a
+      // fetch failure.
+      headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+      sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+    } else {
+      super.channelActive(ctx);
+      handlerCtx.allChannels.add(ctx.channel());
+      LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}",
+          ctx.channel(), ctx.channel().id(), handlerCtx.activeConnections.get());
+    }
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+    super.channelInactive(ctx);
+    int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+    LOG.debug("New value of Accepted number of connections={}", noOfConnections);
+  }
+
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
+    Channel channel = ctx.channel();
+    LOG.debug("Received HTTP request: {}, channel='{}'", request, channel.id());
+
+    if (request.method() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+    // Check whether the shuffle version is compatible
+    String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+    String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+    if (request.headers() != null) {
+      shuffleVersion = request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+      httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+      LOG.debug("Received from request header: ShuffleVersion={} header name={}, channel id: {}",
+          shuffleVersion, httpHeaderName, channel.id());
+    }
+    if (request.headers() == null ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+      sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+      return;
+    }
+    final Map<String, List<String>> q =
+        new QueryStringDecoder(request.uri()).parameters();
+
+    final List<String> keepAliveList = q.get("keepAlive");
+    boolean keepAliveParam = false;
+    if (keepAliveList != null && keepAliveList.size() == 1) {
+      keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("KeepAliveParam: {} : {}, channel id: {}",
+            keepAliveList, keepAliveParam, channel.id());
+      }
+    }
+    final List<String> mapIds = splitMaps(q.get("map"));
+    final List<String> reduceQ = q.get("reduce");
+    final List<String> jobQ = q.get("job");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RECV: " + request.uri() +
+          "\n  mapId: " + mapIds +
+          "\n  reduceId: " + reduceQ +
+          "\n  jobId: " + jobQ +
+          "\n  keepAlive: " + keepAliveParam +
+          "\n  channel id: " + channel.id());
+    }
+
+    if (mapIds == null || reduceQ == null || jobQ == null) {
+      sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+      return;
+    }
+    if (reduceQ.size() != 1 || jobQ.size() != 1) {
+      sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+      return;
+    }
+
+    int reduceId;
+    String jobId;
+    try {
+      reduceId = Integer.parseInt(reduceQ.get(0));
+      jobId = jobQ.get(0);
+    } catch (NumberFormatException e) {
+      sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+      return;
+    } catch (IllegalArgumentException e) {
+      sendError(ctx, "Bad job parameter", BAD_REQUEST);
+      return;
+    }
+    final String reqUri = request.uri();
+    if (null == reqUri) {
+      // TODO? add upstream?
+      sendError(ctx, FORBIDDEN);
+      return;
+    }
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+    try {
+      verifyRequest(jobId, ctx, request, response,
+          new URL("http", "", handlerCtx.port, reqUri));
+    } catch (IOException e) {
+      LOG.warn("Shuffle failure ", e);
+      sendError(ctx, e.getMessage(), UNAUTHORIZED);
+      return;
+    }
+
+    Map<String, MapOutputInfo> mapOutputInfoMap = new HashMap<>();
+    ChannelPipeline pipeline = channel.pipeline();
+    ShuffleHandler.TimeoutHandler timeoutHandler =
+        (ShuffleHandler.TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+    timeoutHandler.setEnabledTimeout(false);
+    String user = handlerCtx.userRsrc.get(jobId);
+
+    try {
+      populateHeaders(mapIds, jobId, user, reduceId,
+          response, keepAliveParam, mapOutputInfoMap);
+    } catch(IOException e) {
+      LOG.error("Shuffle error while populating headers. Channel id: " + channel.id(), e);
+      sendError(ctx, getErrorMessage(e), INTERNAL_SERVER_ERROR);
+      return;
+    }
+
+    channel.write(response);
+
+    //Initialize one ReduceContext object per channelRead call
+    boolean keepAlive = keepAliveParam || handlerCtx.connectionKeepAliveEnabled;
+    ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
+        user, mapOutputInfoMap, jobId, keepAlive);
+
+    sendMap(reduceContext);
+  }
+
+  /**
+   * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
+   * and increments it. This method is first called by messageReceived()
+   * maxSessionOpenFiles times and then on the completion of every
+   * sendMapOutput operation. This limits the number of open files on a node,
+   * which can get really large(exhausting file descriptors on the NM) if all
+   * sendMapOutputs are called in one go, as was done previous to this change.
+   * @param reduceContext used to call sendMapOutput with correct params.
+   */
+  public void sendMap(ReduceContext reduceContext) {
+    LOG.trace("Executing sendMap; channel='{}'", reduceContext.ctx.channel().id());
+    if (reduceContext.getMapsToSend().get() <
+        reduceContext.getMapIds().size()) {
+      int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
+      String mapId = reduceContext.getMapIds().get(nextIndex);
+
+      try {
+        MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
+        if (info == null) {
+          info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
+              reduceContext.getJobId(), reduceContext.getUser());
+        }
+        LOG.trace("Calling sendMapOutput; channel='{}'", reduceContext.ctx.channel().id());
+        ChannelFuture nextMap = sendMapOutput(
+            reduceContext.getCtx().channel(),
+            reduceContext.getUser(), mapId,
+            reduceContext.getReduceId(), info);
+        nextMap.addListener(new ReduceMapFileCount(this, reduceContext));
+      } catch (IOException e) {
+        LOG.error("Shuffle error: {}; channel={}", e, reduceContext.ctx.channel().id());
+
+        // It is not possible to sendError, the success HttpResponse has been already sent
+        reduceContext.ctx.channel().close();
+      }
+    }
+  }
+
+  private String getErrorMessage(Throwable t) {
+    StringBuilder sb = new StringBuilder(t.getMessage());
+    while (t.getCause() != null) {
+      sb.append(t.getCause().getMessage());
+      t = t.getCause();
+    }
+    return sb.toString();
+  }
+
+  protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, String jobId, String user)
+      throws IOException {
+    ShuffleHandler.AttemptPathInfo pathInfo;
+    try {
+      ShuffleHandler.AttemptPathIdentifier identifier = new ShuffleHandler.AttemptPathIdentifier(
+          jobId, user, mapId);
+      pathInfo = handlerCtx.pathCache.get(identifier);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Retrieved pathInfo for " + identifier +
+            " check for corresponding loaded messages to determine whether" +
+            " it was loaded or cached");
+      }
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      } else {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    IndexRecord info =
+        handlerCtx.indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
+          ",dataFile=" + pathInfo.dataPath + ", indexFile=" +
+          pathInfo.indexPath);
+      LOG.debug("getMapOutputInfo: startOffset={}, partLength={} rawLength={}",
+          info.startOffset, info.partLength, info.rawLength);
+    }
+
+    return new MapOutputInfo(pathInfo.dataPath, info);
+  }
+
+  protected void populateHeaders(List<String> mapIds, String jobId,
+                                 String user, int reduce, HttpResponse response,
+                                 boolean keepAliveParam,
+                                 Map<String, MapOutputInfo> mapOutputInfoMap)
+      throws IOException {
+
+    long contentLength = 0;
+    for (String mapId : mapIds) {
+      MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
+      if (mapOutputInfoMap.size() < handlerCtx.mapOutputMetaInfoCacheSize) {
+        mapOutputInfoMap.put(mapId, outputInfo);
+      }
+
+      ShuffleHeader header =
+          new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
+              outputInfo.indexRecord.rawLength, reduce);
+      DataOutputBuffer dob = new DataOutputBuffer();
+      header.write(dob);
+      contentLength += outputInfo.indexRecord.partLength;
+      contentLength += dob.getLength();
+
+      // verify file access to data file to send an actually correct http error
+      final File spillFile = new File(outputInfo.mapOutputFileName.toString());
+      RandomAccessFile r = SecureIOUtils.openForRandomRead(spillFile, "r", user, null);
+      r.close();
+    }
+
+    // Now set the response headers.
+    setResponseHeaders(response, keepAliveParam, contentLength);
+
+    // this audit log is disabled by default,
+    // to turn it on please enable this audit log
+    // on log4j.properties by uncommenting the setting
+    if (AUDITLOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("shuffle for ");
+      sb.append(jobId).append(" reducer ").append(reduce);
+      sb.append(" length ").append(contentLength);
+      if (AUDITLOG.isTraceEnabled()) {
+        // For trace level logging, append the list of mappers
+        sb.append(" mappers: ").append(mapIds);
+        AUDITLOG.trace(sb.toString());
+      } else {
+        AUDITLOG.debug(sb.toString());
+      }
+    }
+  }
+
+  protected void setResponseHeaders(HttpResponse response,
+                                    boolean keepAliveParam, long contentLength) {
+    if (!handlerCtx.connectionKeepAliveEnabled && !keepAliveParam) {
+      response.headers().set(HttpHeader.CONNECTION.asString(), CONNECTION_CLOSE);
+    } else {
+      response.headers().set(HttpHeader.CONNECTION.asString(),
+          HttpHeader.KEEP_ALIVE.asString());
+      response.headers().set(HttpHeader.KEEP_ALIVE.asString(),
+          "timeout=" + handlerCtx.connectionKeepAliveTimeOut);
+    }
+
+    // Content length must be set (https://www.rfc-editor.org/rfc/rfc7230#section-3.3.3)
+    HttpUtil.setContentLength(response, contentLength);
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  static class MapOutputInfo {
+    final Path mapOutputFileName;
+    final IndexRecord indexRecord;
+
+    MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
+      this.mapOutputFileName = mapOutputFileName;
+      this.indexRecord = indexRecord;
+    }
+  }
+
+  protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+                               HttpRequest request, HttpResponse response, URL requestUri)
+      throws IOException {
+    SecretKey tokenSecret = handlerCtx.secretManager.retrieveTokenSecret(appid);
+    if (null == tokenSecret) {
+      LOG.info("Request for unknown token {}, channel id: {}", appid, ctx.channel().id());
+      throw new IOException("Could not find jobid");
+    }
+    // encrypting URL
+    String encryptedURL = SecureShuffleUtils.buildMsgFrom(requestUri);
+    // hash from the fetcher
+    String urlHashStr =
+        request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+    if (urlHashStr == null) {
+      LOG.info("Missing header hash for {}, channel id: {}", appid, ctx.channel().id());
+      throw new IOException("fetcher cannot be authenticated");
+    }
+    if (LOG.isDebugEnabled()) {
+      int len = urlHashStr.length();
+      LOG.debug("Verifying request. encryptedURL:{}, hash:{}, channel id: " +
+              "{}", encryptedURL,
+          urlHashStr.substring(len - len / 2, len - 1), ctx.channel().id());
+    }
+    // verify - throws exception
+    SecureShuffleUtils.verifyReply(urlHashStr, encryptedURL, tokenSecret);
+    // verification passed - encode the reply
+    String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
+        tokenSecret);
+    response.headers().set(
+        SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+    // Put shuffle version into http header
+    response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    if (LOG.isDebugEnabled()) {
+      int len = reply.length();
+      LOG.debug("Fetcher request verified. " +
+              "encryptedURL: {}, reply: {}, channel id: {}",
+          encryptedURL, reply.substring(len - len / 2, len - 1),
+          ctx.channel().id());
+    }
+  }
+
+  public static ByteBuf shuffleHeaderToBytes(ShuffleHeader header) throws IOException {
+    final DataOutputBuffer dob = new DataOutputBuffer();
+    header.write(dob);
+    return wrappedBuffer(dob.getData(), 0, dob.getLength());
+  }
+
+  protected ChannelFuture sendMapOutput(Channel ch, String user, String mapId, int reduce,
+                                        MapOutputInfo mapOutputInfo)
+      throws IOException {
+    final IndexRecord info = mapOutputInfo.indexRecord;
+    ch.write(shuffleHeaderToBytes(
+        new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce)));
+    final File spillFile =
+        new File(mapOutputInfo.mapOutputFileName.toString());
+    RandomAccessFile spill = SecureIOUtils.openForRandomRead(spillFile, "r", user, null);
+    ChannelFuture writeFuture;
+    if (ch.pipeline().get(SslHandler.class) == null) {
+      final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+          info.startOffset, info.partLength, handlerCtx.manageOsCache, handlerCtx.readaheadLength,
+          handlerCtx.readaheadPool, spillFile.getAbsolutePath(),
+          handlerCtx.shuffleBufferSize, handlerCtx.shuffleTransferToAllowed);
+      writeFuture = ch.writeAndFlush(partition);
+      // TODO error handling; distinguish IO/connection failures,
+      //      attribute to appropriate spill output
+      writeFuture.addListener((ChannelFutureListener) future -> {
+        if (future.isSuccess()) {
+          partition.transferSuccessful();
+        }
+        partition.deallocate();
+      });
+    } else {
+      // HTTPS cannot be done with zero copy.
+      final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+          info.startOffset, info.partLength, handlerCtx.sslFileBufferSize,
+          handlerCtx.manageOsCache, handlerCtx.readaheadLength, handlerCtx.readaheadPool,
+          spillFile.getAbsolutePath());
+      writeFuture = ch.writeAndFlush(chunk);
+    }
+
+    handlerCtx.metrics.shuffleConnections.incr();
+    handlerCtx.metrics.shuffleOutputBytes.incr(info.partLength); // optimistic
+    return writeFuture;
+  }
+
+  protected void sendError(ChannelHandlerContext ctx,
+                           HttpResponseStatus status) {
+    sendError(ctx, "", status);
+  }
+
+  protected void sendError(ChannelHandlerContext ctx, String message,
+                           HttpResponseStatus status) {
+    sendError(ctx, message, status, Collections.emptyMap());
+  }
+
+  protected void sendError(ChannelHandlerContext ctx, String msg,
+                           HttpResponseStatus status, Map<String, String> headers) {
+    FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status,
+        Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
+    response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    // Put shuffle version into http header
+    response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    for (Map.Entry<String, String> header : headers.entrySet()) {
+      response.headers().set(header.getKey(), header.getValue());
+    }
+    HttpUtil.setContentLength(response, response.content().readableBytes());
+
+    // Close the connection as soon as the error message is sent.
+    ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+    // TODO: missing keep-alive handling
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+      throws Exception {
+    Channel ch = ctx.channel();
+    if (cause instanceof TooLongFrameException) {
+      LOG.trace("TooLongFrameException, channel id: {}", ch.id());
+      sendError(ctx, BAD_REQUEST);
+      return;
+    } else if (cause instanceof IOException) {
+      if (cause instanceof ClosedChannelException) {
+        LOG.debug("Ignoring closed channel error, channel id: " + ch.id(), cause);
+        return;
+      }
+      String message = String.valueOf(cause.getMessage());
+      if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
+        LOG.debug("Ignoring client socket close, channel id: " + ch.id(), cause);
+        return;
+      }
+    }
+
+    LOG.error("Shuffle error. Channel id: " + ch.id(), cause);
+    if (ch.isActive()) {
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Maintain parameters per messageReceived() Netty context.
+   * Allows sendMapOutput calls from operationComplete()
+   */
+  public static class ReduceContext {
+    private final List<String> mapIds;
+    private final AtomicInteger mapsToWait;
+    private final AtomicInteger mapsToSend;
+    private final int reduceId;
+    private final ChannelHandlerContext ctx;
+    private final String user;
+    private final Map<String, ShuffleChannelHandler.MapOutputInfo> infoMap;
+    private final String jobId;
+    private final boolean keepAlive;
+
+    ReduceContext(List<String> mapIds, int rId,
+                  ChannelHandlerContext context, String usr,
+                  Map<String, ShuffleChannelHandler.MapOutputInfo> mapOutputInfoMap,
+                  String jobId, boolean keepAlive) {
+
+      this.mapIds = mapIds;
+      this.reduceId = rId;
+      /*
+       * Atomic count for tracking the no. of map outputs that are yet to
+       * complete. Multiple futureListeners' operationComplete() can decrement
+       * this value asynchronously. It is used to decide when the channel should
+       * be closed.
+       */
+      this.mapsToWait = new AtomicInteger(mapIds.size());
+      /*
+       * Atomic count for tracking the no. of map outputs that have been sent.
+       * Multiple sendMap() calls can increment this value
+       * asynchronously. Used to decide which mapId should be sent next.
+       */
+      this.mapsToSend = new AtomicInteger(0);
+      this.ctx = context;
+      this.user = usr;
+      this.infoMap = mapOutputInfoMap;
+      this.jobId = jobId;
+      this.keepAlive = keepAlive;
+    }
+
+    public int getReduceId() {
+      return reduceId;
+    }
+
+    public ChannelHandlerContext getCtx() {
+      return ctx;
+    }
+
+    public String getUser() {
+      return user;
+    }
+
+    public Map<String, ShuffleChannelHandler.MapOutputInfo> getInfoMap() {
+      return infoMap;
+    }
+
+    public String getJobId() {
+      return jobId;
+    }
+
+    public List<String> getMapIds() {
+      return mapIds;
+    }
+
+    public AtomicInteger getMapsToSend() {
+      return mapsToSend;
+    }
+
+    public AtomicInteger getMapsToWait() {
+      return mapsToWait;
+    }
+
+    public boolean getKeepAlive() {
+      return keepAlive;
+    }
+  }
+
+  static class ReduceMapFileCount implements ChannelFutureListener {
+    private final ShuffleChannelHandler handler;
+    private final ReduceContext reduceContext;
+
+    ReduceMapFileCount(ShuffleChannelHandler handler, ReduceContext rc) {
+      this.handler = handler;
+      this.reduceContext = rc;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'",
+          this.reduceContext.getMapsToWait().get(), future.channel().id());
+      if (!future.isSuccess()) {
+        LOG.error("Future is unsuccessful. channel='{}' Cause: ",
+            future.channel().id(), future.cause());
+        future.channel().close();
+        return;
+      }
+      int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
+      if (waitCount == 0) {
+        ChannelFuture lastContentFuture =
+            future.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+        handler.handlerCtx.metrics.operationComplete(future);
+
+        // Let the idle timer handler close keep-alive connections
+        if (reduceContext.getKeepAlive()) {
+          LOG.trace("SendMap operation complete, keeping alive the connection; channel='{}'",
+              future.channel().id());
+          ChannelPipeline pipeline = future.channel().pipeline();
+          ShuffleHandler.TimeoutHandler timeoutHandler =
+              (ShuffleHandler.TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+          timeoutHandler.setEnabledTimeout(true);
+        } else {
+          LOG.trace("SendMap operation complete, closing connection; channel='{}'",
+              future.channel().id());
+          lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+        }
+      } else {
+        LOG.trace("SendMap operation complete, waitCount > 0, " +
+                "invoking sendMap with reduceContext; channel='{}'",
+            future.channel().id());
+        handler.sendMap(reduceContext);
+      }
+    }
+  }
+}

+ 140 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandlerContext.java

@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.channel.group.ChannelGroup;
+
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.util.Shell;
+
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_MAX_SHUFFLE_CONNECTIONS;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_BUFFER_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MANAGE_OS_CACHE;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_READAHEAD_BYTES;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.MAX_SHUFFLE_CONNECTIONS;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_BUFFER_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MANAGE_OS_CACHE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_READAHEAD_BYTES;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_TRANSFERTO_ALLOWED;
+import static org.apache.hadoop.mapred.ShuffleHandler.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.mapred.ShuffleHandler.WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class ShuffleChannelHandlerContext {
+
+  public final Configuration conf;
+  public final JobTokenSecretManager secretManager;
+  public final Map<String, String> userRsrc;
+  public final LoadingCache<ShuffleHandler.AttemptPathIdentifier,
+      ShuffleHandler.AttemptPathInfo> pathCache;
+  public final IndexCache indexCache;
+  public final ShuffleHandler.ShuffleMetrics metrics;
+  public final ChannelGroup allChannels;
+
+
+  public final boolean connectionKeepAliveEnabled;
+  public final int sslFileBufferSize;
+  public final int connectionKeepAliveTimeOut;
+  public final int mapOutputMetaInfoCacheSize;
+
+  public final AtomicInteger activeConnections = new AtomicInteger();
+
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile.
+   */
+  public final boolean manageOsCache;
+  public final int readaheadLength;
+  public final int maxShuffleConnections;
+  public final int shuffleBufferSize;
+  public final boolean shuffleTransferToAllowed;
+  public final int maxSessionOpenFiles;
+  public final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+  public int port = -1;
+
+  public ShuffleChannelHandlerContext(Configuration conf,
+                                      Map<String, String> userRsrc,
+                                      JobTokenSecretManager secretManager,
+                                      LoadingCache<ShuffleHandler.AttemptPathIdentifier,
+                                          ShuffleHandler.AttemptPathInfo> patCache,
+                                      IndexCache indexCache,
+                                      ShuffleHandler.ShuffleMetrics metrics,
+                                      ChannelGroup allChannels) {
+    this.conf = conf;
+    this.userRsrc = userRsrc;
+    this.secretManager = secretManager;
+    this.pathCache = patCache;
+    this.indexCache = indexCache;
+    this.metrics = metrics;
+    this.allChannels = allChannels;
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+        DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+    connectionKeepAliveEnabled =
+        conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
+            DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
+    connectionKeepAliveTimeOut =
+        Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
+            DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
+    mapOutputMetaInfoCacheSize =
+        Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
+            DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
+
+    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+        DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+    maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
+        DEFAULT_MAX_SHUFFLE_CONNECTIONS);
+
+    shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
+        DEFAULT_SHUFFLE_BUFFER_SIZE);
+
+    shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+        (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+            DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
+
+    maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
+        DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
+  }
+
+  void setPort(int port) {
+    this.port = port;
+  }
+}

+ 74 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelInitializer.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+public class ShuffleChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+  public static final int MAX_CONTENT_LENGTH = 1 << 16;
+
+  private final ShuffleChannelHandlerContext handlerContext;
+  private final SSLFactory sslFactory;
+
+
+  public ShuffleChannelInitializer(ShuffleChannelHandlerContext ctx, SSLFactory sslFactory) {
+    this.handlerContext = ctx;
+    this.sslFactory = sslFactory;
+  }
+
+  @Override
+  public void initChannel(SocketChannel ch) throws GeneralSecurityException, IOException {
+    LOG.debug("ShuffleChannelInitializer init; channel='{}'", ch.id());
+
+    ChannelPipeline pipeline = ch.pipeline();
+    if (sslFactory != null) {
+      pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+    }
+    pipeline.addLast("http", new HttpServerCodec());
+    pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
+    pipeline.addLast("chunking", new ChunkedWriteHandler());
+
+    // An EventExecutorGroup could be specified to run in a
+    // different thread than an I/O thread so that the I/O thread
+    // is not blocked by a time-consuming task:
+    // https://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html
+    pipeline.addLast("shuffle", new ShuffleChannelHandler(handlerContext));
+
+    pipeline.addLast(TIMEOUT_HANDLER,
+        new ShuffleHandler.TimeoutHandler(handlerContext.connectionKeepAliveTimeOut));
+    // TODO factor security manager into pipeline
+    // TODO factor out encode/decode to permit binary shuffle
+    // TODO factor out decode of index to permit alt. models
+  }
+}

Diferenças do arquivo suprimidas por serem muito extensas
+ 119 - 913
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java


+ 562 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java

@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.FileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpResponseDecoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleChannelHandler.shuffleHeaderToBytes;
+import static org.apache.hadoop.mapred.ShuffleChannelInitializer.MAX_CONTENT_LENGTH;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapreduce.security.SecureShuffleUtils.HTTP_HEADER_URL_HASH;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestShuffleChannelHandler extends TestShuffleHandlerBase {
+  private static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestShuffleChannelHandler.class);
+
+  @Test
+  public void testGetMapsFileRegion() throws IOException {
+    final ShuffleTest t = createShuffleTest();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    t.testGetAllAttemptsForReduce0NoKeepAlive(shuffle.outboundMessages(), shuffle);
+  }
+
+  @Test
+  public void testGetMapsChunkedFileSSl() throws Exception {
+    final ShuffleTest t = createShuffleTest();
+    final LinkedList<Object> unencryptedMessages = new LinkedList<>();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerSSL(unencryptedMessages);
+    t.testGetAllAttemptsForReduce0NoKeepAlive(unencryptedMessages, shuffle);
+  }
+
+  @Test
+  public void testKeepAlive() throws Exception {
+    // TODO: problems with keep-alive
+    // current behaviour:
+    //  a) mapreduce.shuffle.connection-keep-alive.enable=false
+    //     + client request with &keepAlive=true
+    //     ==> connection is kept
+    //  b) mapreduce.shuffle.connection-keep-alive.enable=true
+    //     ==> connection is kept
+    //
+    // a) seems like a bug
+    // b) might be ok, because it's the default in HTTP/1.1
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "false");
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, "15");
+    final ShuffleTest t = createShuffleTest(conf);
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    t.testKeepAlive(shuffle.outboundMessages(), shuffle);
+  }
+
+  @Test
+  public void testKeepAliveSSL() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "false");
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, "15");
+    final ShuffleTest t = createShuffleTest(conf);
+    final LinkedList<Object> unencryptedMessages = new LinkedList<>();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerSSL(unencryptedMessages);
+    t.testKeepAlive(unencryptedMessages, shuffle);
+  }
+
+  @Test
+  public void tetKeepAliveTimeout() throws InterruptedException, IOException {
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "true");
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, "1");
+    final ShuffleTest t = createShuffleTest(conf);
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Collections.singletonList(TEST_ATTEMPT_1), true));
+    shuffle.writeInbound(req);
+    t.assertResponse(shuffle.outboundMessages(),
+        t.getExpectedHttpResponse(req, true, 46),
+        t.getAttemptData(new Attempt(TEST_ATTEMPT_1, TEST_DATA_A))
+    );
+    assertTrue("keep-alive", shuffle.isActive());
+
+    TimeUnit.SECONDS.sleep(3);
+    shuffle.runScheduledPendingTasks();
+
+    assertFalse("closed", shuffle.isActive());
+  }
+
+  @Test
+  public void testIncompatibleShuffleVersion() {
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "true");
+    final ShuffleTest t = createShuffleTest(conf);
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Collections.singletonList(TEST_ATTEMPT_1), true));
+    req.headers().set(ShuffleHeader.HTTP_HEADER_NAME, "invalid");
+    shuffle.writeInbound(req);
+
+    final EmbeddedChannel decoder = t.createHttpResponseChannel();
+    for (Object obj : shuffle.outboundMessages()) {
+      decoder.writeInbound(obj);
+    }
+    DefaultHttpResponse actual = decoder.readInbound();
+    assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
+    actual.headers().set(CONTENT_LENGTH, 0);
+
+    assertEquals(getExpectedHttpResponse(HttpResponseStatus.BAD_REQUEST).toString(),
+        actual.toString());
+
+    assertFalse("closed", shuffle.isActive()); // known-issue
+  }
+
+  @Test
+  public void testInvalidMapNoIndexFile() {
+    final ShuffleTest t = createShuffleTest();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Arrays.asList(TEST_ATTEMPT_1, "non-existing"), true));
+    shuffle.writeInbound(req);
+
+    final EmbeddedChannel decoder = t.createHttpResponseChannel();
+    for (Object obj : shuffle.outboundMessages()) {
+      decoder.writeInbound(obj);
+    }
+
+    DefaultHttpResponse actual = decoder.readInbound();
+    assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
+    actual.headers().set(CONTENT_LENGTH, 0);
+
+    assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
+        actual.toString());
+
+    assertFalse("closed", shuffle.isActive());
+  }
+
+  @Test
+  public void testInvalidMapNoDataFile() {
+    final ShuffleTest t = createShuffleTest();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+
+    String dataFile = getDataFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
+    assertTrue("should delete", new File(dataFile).delete());
+
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Arrays.asList(TEST_ATTEMPT_1, TEST_ATTEMPT_2), false));
+    shuffle.writeInbound(req);
+
+    final EmbeddedChannel decoder = t.createHttpResponseChannel();
+    for (Object obj : shuffle.outboundMessages()) {
+      decoder.writeInbound(obj);
+    }
+
+    DefaultHttpResponse actual = decoder.readInbound();
+    assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
+    actual.headers().set(CONTENT_LENGTH, 0);
+
+    assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
+        actual.toString());
+
+    assertFalse("closed", shuffle.isActive());
+  }
+
+  private DefaultHttpResponse getExpectedHttpResponse(HttpResponseStatus status) {
+    DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+    response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    response.headers().set(CONTENT_LENGTH, 0);
+    return response;
+  }
+
+  private ShuffleTest createShuffleTest() {
+    return createShuffleTest(new Configuration());
+  }
+
+  private ShuffleTest createShuffleTest(Configuration conf) {
+    return new ShuffleTest(conf);
+  }
+
+  private File getResourceFile(String resourceName) {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    return new File(Objects.requireNonNull(classLoader.getResource(resourceName)).getFile());
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  static class Attempt {
+    final String id;
+    final String content;
+
+    Attempt(String attempt, String content) {
+      this.id = attempt;
+      this.content = content;
+    }
+  }
+
+  private class ShuffleTest {
+    private final ShuffleChannelHandlerContext ctx;
+    private final SecretKey shuffleSecretKey;
+
+    ShuffleTest(Configuration conf) {
+      JobConf jobConf = new JobConf(conf);
+      MetricsSystem ms = DefaultMetricsSystem.instance();
+      this.ctx = new ShuffleChannelHandlerContext(conf,
+          new ConcurrentHashMap<>(),
+          new JobTokenSecretManager(),
+          createLoadingCache(),
+          new IndexCache(jobConf),
+          ms.register(new ShuffleHandler.ShuffleMetrics()),
+          new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
+      );
+
+      JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(TEST_JOB_ID));
+      Token<JobTokenIdentifier> token = new Token<>(tokenId, ctx.secretManager);
+      shuffleSecretKey = JobTokenSecretManager.createSecretKey(token.getPassword());
+
+      ctx.userRsrc.put(TEST_JOB_ID, TEST_USER);
+      ctx.secretManager.addTokenForJob(TEST_JOB_ID, token);
+    }
+
+    public FullHttpRequest createRequest(String uri) {
+      FullHttpRequest request =
+          new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
+      request.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      request.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      request.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      try {
+        String msgToEncode = SecureShuffleUtils.buildMsgFrom(new URL("http", "", ctx.port, uri));
+        request.headers().set(HTTP_HEADER_URL_HASH,
+            SecureShuffleUtils.hashFromString(msgToEncode, shuffleSecretKey));
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail("Could not create URL hash for test request");
+      }
+
+      return request;
+    }
+
+    public DefaultHttpResponse getExpectedHttpResponse(
+        FullHttpRequest request, boolean keepAlive, long contentLength) {
+      DefaultHttpResponse response =
+          new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      HttpHeaders headers = response.headers();
+      try {
+        SecretKey tokenSecret = ctx.secretManager.retrieveTokenSecret(TEST_JOB_ID);
+        headers.set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH,
+            SecureShuffleUtils.generateHash(
+                request.headers().get(HTTP_HEADER_URL_HASH).getBytes(Charsets.UTF_8),
+                tokenSecret));
+      } catch (SecretManager.InvalidToken e) {
+        fail("Could not generate reply hash");
+      }
+      headers.set(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      headers.set(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      if (keepAlive) {
+        headers.set(HttpHeader.CONNECTION.asString(), HttpHeader.KEEP_ALIVE.asString());
+        headers.set(HttpHeader.KEEP_ALIVE.asString(), "timeout=" + ctx.connectionKeepAliveTimeOut);
+      } else {
+        response.headers().set(HttpHeader.CONNECTION.asString(), CONNECTION_CLOSE);
+      }
+      HttpUtil.setContentLength(response, contentLength);
+      return response;
+    }
+
+    private void testGetAllAttemptsForReduce0NoKeepAlive(
+        java.util.Queue<Object> outboundMessages, EmbeddedChannel shuffle) throws IOException {
+      final FullHttpRequest request = createRequest(
+          getUri(TEST_JOB_ID, 0,
+              Arrays.asList(TEST_ATTEMPT_1, TEST_ATTEMPT_2, TEST_ATTEMPT_3), false));
+      shuffle.writeInbound(request);
+      assertResponse(outboundMessages,
+          getExpectedHttpResponse(request, false, 138),
+          getAllAttemptsForReduce0()
+      );
+      assertFalse("no keep-alive", shuffle.isActive());
+    }
+
+    private void testKeepAlive(java.util.Queue<Object> messages,
+                               EmbeddedChannel shuffle) throws IOException {
+      final FullHttpRequest req1 = createRequest(
+          getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true));
+      shuffle.writeInbound(req1);
+      assertResponse(messages,
+          getExpectedHttpResponse(req1, true, 46),
+          getAttemptData(new Attempt(TEST_ATTEMPT_1, TEST_DATA_A))
+      );
+      assertTrue("keep-alive", shuffle.isActive());
+      messages.clear();
+
+      final FullHttpRequest req2 = createRequest(
+          getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_2), true));
+      shuffle.writeInbound(req2);
+      assertResponse(messages,
+          getExpectedHttpResponse(req2, true, 46),
+          getAttemptData(new Attempt(TEST_ATTEMPT_2, TEST_DATA_B))
+      );
+      assertTrue("keep-alive", shuffle.isActive());
+      messages.clear();
+
+      final FullHttpRequest req3 = createRequest(
+          getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_3), false));
+      shuffle.writeInbound(req3);
+      assertResponse(messages,
+          getExpectedHttpResponse(req3, false, 46),
+          getAttemptData(new Attempt(TEST_ATTEMPT_3, TEST_DATA_C))
+      );
+      assertFalse("no keep-alive", shuffle.isActive());
+    }
+
+    private ArrayList<ByteBuf> getAllAttemptsForReduce0() throws IOException {
+      return getAttemptData(
+          new Attempt(TEST_ATTEMPT_1, TEST_DATA_A),
+          new Attempt(TEST_ATTEMPT_2, TEST_DATA_B),
+          new Attempt(TEST_ATTEMPT_3, TEST_DATA_C)
+      );
+    }
+
+    private ArrayList<ByteBuf> getAttemptData(Attempt... attempts) throws IOException {
+      ArrayList<ByteBuf> data = new ArrayList<>();
+      for (Attempt attempt : attempts) {
+        data.add(shuffleHeaderToBytes(new ShuffleHeader(attempt.id, attempt.content.length(),
+            attempt.content.length() * 2L, 0)));
+        data.add(Unpooled.copiedBuffer(attempt.content.getBytes(StandardCharsets.UTF_8)));
+      }
+      return data;
+    }
+
+    private void assertResponse(java.util.Queue<Object> outboundMessages,
+                                DefaultHttpResponse response,
+                                List<ByteBuf> content) {
+      final EmbeddedChannel decodeChannel = createHttpResponseChannel();
+
+      content.add(LastHttpContent.EMPTY_LAST_CONTENT.content());
+
+      int i = 0;
+      for (Object outboundMessage : outboundMessages) {
+        ByteBuf actualBytes = ((ByteBuf) outboundMessage);
+        String actualHexdump = ByteBufUtil.prettyHexDump(actualBytes);
+        LOG.info("\n{}", actualHexdump);
+
+        decodeChannel.writeInbound(actualBytes);
+        Object obj = decodeChannel.readInbound();
+        LOG.info("Decoded object: {}", obj);
+
+        if (i == 0) {
+          DefaultHttpResponse resp = (DefaultHttpResponse) obj;
+          assertEquals(response.toString(), resp.toString());
+        }
+        if (i > 0 && i <= content.size()) {
+          assertEquals("data should match",
+              ByteBufUtil.prettyHexDump(content.get(i - 1)), actualHexdump);
+        }
+
+        i++;
+      }
+
+      // This check is done after to have better debug logs on failure.
+      assertEquals("all data should match", content.size() + 1, outboundMessages.size());
+    }
+
+    public EmbeddedChannel createShuffleHandlerChannelFileRegion() {
+      final EmbeddedChannel channel = createShuffleHandlerChannel();
+
+      channel.pipeline().addFirst(
+          new MessageToMessageEncoder<FileRegion>() {
+            @Override
+            protected void encode(
+                ChannelHandlerContext cCtx, FileRegion msg, List<Object> out) throws Exception {
+              ByteArrayOutputStream stream = new ByteArrayOutputStream();
+              WritableByteChannel wbc = Channels.newChannel(stream);
+              msg.transferTo(wbc, msg.position());
+              out.add(Unpooled.wrappedBuffer(stream.toByteArray()));
+            }
+          }
+      );
+
+      return channel;
+    }
+
+    public EmbeddedChannel createSSLClient() throws Exception {
+      final EmbeddedChannel channel = createShuffleHandlerChannel();
+
+      SSLContext sc = SSLContext.getInstance("SSL");
+
+      final TrustManager trm = new X509TrustManager() {
+        public X509Certificate[] getAcceptedIssuers() {
+          return null;
+        }
+
+        public void checkClientTrusted(X509Certificate[] certs, String authType) {
+        }
+
+        public void checkServerTrusted(X509Certificate[] certs, String authType) {
+        }
+      };
+
+      sc.init(null, new TrustManager[]{trm}, null);
+
+      final SSLEngine sslEngine = sc.createSSLEngine();
+      sslEngine.setUseClientMode(true);
+      channel.pipeline().addFirst("ssl", new SslHandler(sslEngine));
+
+      return channel;
+    }
+
+    public EmbeddedChannel createShuffleHandlerSSL(java.util.Queue<Object> unencryptedMessages)
+        throws Exception {
+      final EmbeddedChannel channel = createShuffleHandlerChannel();
+      // SelfSignedCertificate was generated manually with:
+      //  openssl req -x509 -newkey rsa:4096 -keyout key.pem \
+      //    -out cert.pem -sha256 -days 3650 -nodes -subj '/CN=localhost'
+      // Because:
+      //  SelfSignedCertificate ssc = new SelfSignedCertificate();
+      // Throws: Failed to generate a self-signed X.509 certificate using Bouncy Castle
+      final SslContext sslCtx = SslContextBuilder
+          .forServer(getResourceFile("cert.pem"), getResourceFile("key.pem"))
+          .build();
+      final SslHandler sslHandler = sslCtx.newHandler(ByteBufAllocator.DEFAULT);
+      channel.pipeline().addFirst("ssl", sslHandler);
+
+      channel.pipeline().addAfter("ssl", "unencrypted", new MessageToMessageEncoder<ByteBuf>() {
+        @Override
+        protected void encode(ChannelHandlerContext cCtx, ByteBuf msg, List<Object> out) {
+          unencryptedMessages.add(msg.copy());
+          out.add(msg.retain());
+        }
+      });
+
+      channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+        @Override
+        public void userEventTriggered(ChannelHandlerContext cCtx, Object evt) {
+          LOG.info("EVENT: {}", evt);
+        }
+      });
+
+      // SSLHandshake must be done, otherwise messages are buffered
+      final EmbeddedChannel client = createSSLClient();
+      for (Object obj : client.outboundMessages()) {
+        channel.writeInbound(obj);
+      }
+      client.outboundMessages().clear();
+      for (Object obj : channel.outboundMessages()) {
+        client.writeInbound(obj);
+      }
+      channel.outboundMessages().clear();
+      for (Object obj : client.outboundMessages()) {
+        channel.writeInbound(obj);
+      }
+      client.outboundMessages().clear();
+
+      return channel;
+    }
+
+    public EmbeddedChannel createShuffleHandlerChannel() {
+      final EmbeddedChannel channel = new EmbeddedChannel();
+      channel.pipeline().addLast("http", new HttpServerCodec());
+      channel.pipeline().addLast("aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
+      channel.pipeline().addLast("chunking", new ChunkedWriteHandler());
+      channel.pipeline().addLast("shuffle", new ShuffleChannelHandler(ctx));
+      channel.pipeline().addLast(TIMEOUT_HANDLER,
+          new ShuffleHandler.TimeoutHandler(ctx.connectionKeepAliveTimeOut));
+      return channel;
+    }
+
+    public EmbeddedChannel createHttpResponseChannel() {
+      return new EmbeddedChannel(
+          new HttpResponseDecoder()
+      );
+    }
+  }
+}

Diferenças do arquivo suprimidas por serem muito extensas
+ 30 - 1212
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java


+ 172 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java

@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.util.ResourceLeakDetector;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
+import org.junit.After;
+import org.junit.Before;
+
+import static io.netty.util.ResourceLeakDetector.Level.PARANOID;
+import static org.apache.hadoop.io.MapFile.DATA_FILE_NAME;
+import static org.apache.hadoop.io.MapFile.INDEX_FILE_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestShuffleHandlerBase {
+  public static final String TEST_ATTEMPT_1 = "attempt_1111111111111_0001_m_000001_0";
+  public static final String TEST_ATTEMPT_2 = "attempt_1111111111111_0002_m_000002_0";
+  public static final String TEST_ATTEMPT_3 = "attempt_1111111111111_0003_m_000003_0";
+  public static final String TEST_JOB_ID = "job_1111111111111_0001";
+  public static final String TEST_USER = "testUser";
+  public static final String TEST_DATA_A = "aaaaa";
+  public static final String TEST_DATA_B = "bbbbb";
+  public static final String TEST_DATA_C = "ccccc";
+
+  private final PrintStream standardOut = System.out;
+  private final ByteArrayOutputStream outputStreamCaptor = new ByteArrayOutputStream();
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected java.nio.file.Path tempDir;
+
+  @Before
+  public void setup() throws IOException {
+    tempDir = Files.createTempDirectory("test-shuffle-channel-handler");
+    tempDir.toFile().deleteOnExit();
+
+    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
+        Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C));
+    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
+        Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C));
+    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
+        Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
+
+    outputStreamCaptor.reset();
+    ResourceLeakDetector.setLevel(PARANOID);
+    System.setOut(new PrintStream(outputStreamCaptor));
+  }
+
+  @After
+  public void teardown() {
+    System.setOut(standardOut);
+    System.out.print(outputStreamCaptor);
+    // For this to work ch.qos.logback.classic is needed for some reason
+    assertFalse(outputStreamCaptor.toString()
+        .contains("LEAK: ByteBuf.release() was not called before"));
+  }
+
+  public List<String> matchLogs(String pattern) {
+    String logs = outputStreamCaptor.toString();
+    Matcher m = Pattern.compile(pattern).matcher(logs);
+    List<String> allMatches = new ArrayList<>();
+    while (m.find()) {
+      allMatches.add(m.group());
+    }
+    return allMatches;
+  }
+
+  public static void generateMapOutput(String tempDir, String attempt, List<String> maps)
+      throws IOException {
+    SpillRecord record = new SpillRecord(maps.size());
+
+    assertTrue(new File(getBasePath(tempDir, attempt)).mkdirs());
+    try (PrintWriter writer = new PrintWriter(getDataFile(tempDir, attempt), "UTF-8")) {
+      long startOffset = 0;
+      int partition = 0;
+      for (String map : maps) {
+        record.putIndex(new IndexRecord(
+                startOffset,
+                map.length() * 2L, // doesn't matter in this test
+                map.length()),
+            partition);
+        startOffset += map.length() + 1;
+        partition++;
+        writer.write(map);
+      }
+      record.writeToFile(new Path(getIndexFile(tempDir, attempt)),
+          new JobConf(new Configuration()));
+    }
+  }
+
+  public static String getIndexFile(String tempDir, String attempt) {
+    return String.format("%s/%s", getBasePath(tempDir, attempt), INDEX_FILE_NAME);
+  }
+
+  public static String getDataFile(String tempDir, String attempt) {
+    return String.format("%s/%s", getBasePath(tempDir, attempt), DATA_FILE_NAME);
+  }
+
+  private static String getBasePath(String tempDir, String attempt) {
+    return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, TEST_USER, attempt);
+  }
+
+  public static String getUri(String jobId, int reduce, List<String> maps, boolean keepAlive) {
+    return String.format("/mapOutput?job=%s&reduce=%d&map=%s%s",
+        jobId, reduce, String.join(",", maps),
+        keepAlive ? "&keepAlive=true" : "");
+  }
+
+  public LoadingCache<ShuffleHandler.AttemptPathIdentifier,
+      ShuffleHandler.AttemptPathInfo> createLoadingCache() {
+    return CacheBuilder.newBuilder().expireAfterAccess(
+            5,
+            TimeUnit.MINUTES).softValues().concurrencyLevel(16).
+        removalListener(
+            (RemovalListener<ShuffleHandler.AttemptPathIdentifier,
+                ShuffleHandler.AttemptPathInfo>) notification -> {
+            }
+        ).maximumWeight(10 * 1024 * 1024).weigher(
+            (key, value) -> key.jobId.length() + key.user.length() +
+                key.attemptId.length() +
+                value.indexPath.toString().length() +
+                value.dataPath.toString().length()
+        ).build(new CacheLoader<ShuffleHandler.AttemptPathIdentifier,
+            ShuffleHandler.AttemptPathInfo>() {
+          @Override
+          public ShuffleHandler.AttemptPathInfo load(
+              @Nonnull ShuffleHandler.AttemptPathIdentifier key) {
+            String base = String.format("%s/%s/%s/", tempDir, key.jobId, key.user);
+            String attemptBase = base + key.attemptId;
+            Path indexFileName = new Path(attemptBase + "/" + INDEX_FILE_NAME);
+            Path mapOutputFileName = new Path(attemptBase + "/" + DATA_FILE_NAME);
+            return new ShuffleHandler.AttemptPathInfo(indexFileName, mapOutputFileName);
+          }
+        });
+  }
+}

+ 27 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/cert.pem

@@ -0,0 +1,27 @@
+-----BEGIN CERTIFICATE-----
+MIIEpDCCAowCCQDDMEtH5Wp0qTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
+b2NhbGhvc3QwHhcNMjMwMTE2MTI0NjQ4WhcNMzMwMTEzMTI0NjQ4WjAUMRIwEAYD
+VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDO
+FiF+sfoJYHPMPx4jaU11mCupytAFJzz9igaiaKAZCjVHBVWC31KDxHmRdKD066DO
+clOJORNOe8Oe4aB5Lbu6wgKtlHEtKmqAU2WrYAEl0oXrZKEL0Xgs1KTTChbVSJ/I
+m1WwmEthriQSul0WaEncNpS5NV4PORhiGu0plw+SWSJBFsbl29K6oHE1ClgVjm8j
+iu4Y1NAilOPcjmhCmwRq5eq5H0mJ5LWxfvjLIJ9cPpMLG9eVLQkOIE9I01DJ37WM
+OvljUMpmhxWDq2oZEmeyCJUFSUh1IlcUM1hTmRUzU/Vcf7EhpAYZxphvSIvDQkAw
+cmnn0LQZmORCMP0HurR1o3NnzAVf/ahfpXwvA/BuCsEcW1Le+WATtxa2EvRCnEPa
+I76W35FY69t/WYZNIzPgo9eYD7iDBbqxuBH+GlDuwWU6mjEc0nL11uGtcRPrXzKa
+QhRMqAtwNW5I5S5HgPLbMiu/n+PpX6+S431eLHFHJ6WUvcwOIK4ZqLH4/Piks1fV
+0Svdo47Jymlt6dOvYm85tFsWkYcmldO6aQilRuGBYdXJ06xDyH7EaD0Z2PmPjhl9
+zkt3gpaXxBn0gsJIn++qZ26pXFxVewlJi0m84Xd3x10h9MvpP8AZMhFkLWXR2nqw
+eCfell4jzGNXBDLEcspv6HmuTvP7+gqgRCuFLrjOiQIDAQABMA0GCSqGSIb3DQEB
+CwUAA4ICAQAexU5VTmT5VAqau0TGTGEgStGPWoliV4b+d8AcdJvOd1qmetgFhJ+X
+TSutcFPdascOys0+tUV2GZwew3P8yTQyd35LDUC4OjGt2kISDplGAtTdDD19u5/R
+hQf7VxJImOxsg2lPPRv3RXMbNF64reP9YsM9osWgJOFzrDf9FkP2HByslU2v7ler
+sWQVu+Ei7r3/ZMOI7hHnN8MLqcj+BJwEHCTa8HPmr0Ic3lJ86vUVR4QQE5LgNvSu
+oSOZlALsMNVx2rxmirhC6guLwPh7HylDFMzyVedCzmqwQ0R8SSU6SmJvXNLeBFLw
+F5mZRh1jabiqsMTGnmMQ97GPs0q78M2sw3TjI+nexCcYZ3jQfR+1eFSg4DlSd55x
+BMVfT2kYThzxOw3brtygXjl6gGr8v5M6PzOvbLyDtEa3iDp7Mslw2tJ5OmxxJV9g
+QVvBQL1L2nySFk0ij2bIjD7fdpF/EpxrNf4IATOAf5YvxELUeXnyuqJZBtgC8b3I
+qXHJIpGM7N4jdwhe0sMVH0OWlqzsL14QZCE6YdvXBEksJ/HBVUie6afYAZrUwUP1
+gtcq9uFpPteg9PsBLZ7hGfNt2278EXhPBtlIpeiPE8X19Lr3bCmCO1PbWNCTkweb
+tGfwnH46DmWYUqYrofnKso1mq56yEbbuDy7a2FeHJ2d+18Fh97WnUw==
+-----END CERTIFICATE-----

+ 52 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/key.pem

@@ -0,0 +1,52 @@
+-----BEGIN PRIVATE KEY-----
+MIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQDOFiF+sfoJYHPM
+Px4jaU11mCupytAFJzz9igaiaKAZCjVHBVWC31KDxHmRdKD066DOclOJORNOe8Oe
+4aB5Lbu6wgKtlHEtKmqAU2WrYAEl0oXrZKEL0Xgs1KTTChbVSJ/Im1WwmEthriQS
+ul0WaEncNpS5NV4PORhiGu0plw+SWSJBFsbl29K6oHE1ClgVjm8jiu4Y1NAilOPc
+jmhCmwRq5eq5H0mJ5LWxfvjLIJ9cPpMLG9eVLQkOIE9I01DJ37WMOvljUMpmhxWD
+q2oZEmeyCJUFSUh1IlcUM1hTmRUzU/Vcf7EhpAYZxphvSIvDQkAwcmnn0LQZmORC
+MP0HurR1o3NnzAVf/ahfpXwvA/BuCsEcW1Le+WATtxa2EvRCnEPaI76W35FY69t/
+WYZNIzPgo9eYD7iDBbqxuBH+GlDuwWU6mjEc0nL11uGtcRPrXzKaQhRMqAtwNW5I
+5S5HgPLbMiu/n+PpX6+S431eLHFHJ6WUvcwOIK4ZqLH4/Piks1fV0Svdo47Jymlt
+6dOvYm85tFsWkYcmldO6aQilRuGBYdXJ06xDyH7EaD0Z2PmPjhl9zkt3gpaXxBn0
+gsJIn++qZ26pXFxVewlJi0m84Xd3x10h9MvpP8AZMhFkLWXR2nqweCfell4jzGNX
+BDLEcspv6HmuTvP7+gqgRCuFLrjOiQIDAQABAoICAQDAe6UfK2YIugCN5OnmUyUY
+z18AwD/YgFSTzgXyTNwzZvhp9A5xJNpx3eFZvN/Uwfs4t0lUom1o4WnNjJkQdWmg
+vjI4I6wtbi942evcy9dmlyGjwSI14phm7tlfj03SOXmbqZG4VhYaDsb8gvoMwq0x
+s/zmm3TVrRMcFmAqd0ABBaVbu8VbzRweWVpDGv04bQda4BkQMjyQABZu2seAZj8T
+BNldvF44H9igBqKjPj35rywxtPh/CUgq3HyQ3WXYl0x+xFpHq57Pch3jFAgNkMYv
+X5qoDFFTrhY89NPriNBnV2SU12L+s69LBdU8Izr+zXMcjNBjxudf/RA8znqWbIi8
+pbwXOwBUD4XP3coAzipVOJfeXb5OAkq+wjHnKb4YXJ5mNFb7LcRy6MJVqyxPNJGh
+UlfGxsJ441K/9e+aoDEyB0xbjeZ+2yP021P2eObwj46M5sxP2BFSe8E1DUpQ5+ZX
+kKipKDZETLc2e4G37Hziw2Wa5X0AAbKgSh1a5AMd0GUbrmJQzO0dok1ujJNu+zpn
+K0qAG0c/HD+egIKPEc03+81fLzXKxGHOxrTnHPInWLeGjxEL3oM2Tl5QkYSjm8qg
+uIY5up5K//R+fDy45/XRACPOo+yf2RTFnIjfIhxJaB2M7BrBUpWvX1xLJQfDS3Gb
+4Rfo2Qlgh/adrNkr2m0NHQKCAQEA8KZK7eugKV/Gk5L0j5E59qytlVZTUoDWdbAq
+vMnAgU6BGiTy/Av4jPCH5HDYD5ZX5nqD+GVkXSh2fiT8eSpgtIuBEdeiHHZXwCcb
+IK7vKxSePQrs0aA53uk7LY0LGPMRhDheYTItTE+6pRp2HswDgRBw+1fm6Yt1ps32
+oqM7bOUSg6eCKISmAP8UV9ac1l6ZHLdhTIzrVnOc/YqIW4ruNbwpSK1fI7uTWH4i
+5JqfPtTa7anJrt080vVUi6cS22G8QhlW3q6fo1GrH8QM4gInRF/4MwkAAP8p1787
+KlFHXxS0bWnJZhjKvh7naeVJi5EaMCWJ1gKF/OcvQYONrA6zdwKCAQEA2ztvxaPy
+j4Pq2wpYWlHueCPPn5yMDQQqCLlcG50HzPbquSdZe7o0MEWqV8MaXB6fw1xLwCC4
+i5+HnL72KaBu6DVIhMYDmPzhiw4GbCASfR4v/biQ+047KfnQiHPUEhUCxGvHhcDR
+Y3Zhzax6mC79Mfz2gunEx2ZI1MURn/sO+3tQtx+Gzsoj/W4LHpeEQGKsUhcIN48v
+XAmeWqVwwmr0twQygnOQyvgZxtiunnIADVxJJh4AQLWGagDiMjaWJ4fZ7q8aBMLY
+SFBlDqzf5xssraUAiaawsaRL0jliy0y8WXwSJHb4WKebH2QQcUq22c2y8IbKMcsz
+AjLHf1nG0oEN/wKCAQEAypfkutnEEzDbVz+/feIQPzfuRqvPJ8TpR1jREfBzjSiP
+DXiPy1s0m0qfzNSnQLAcPh9kXMjXLooy/02Z81OW6EgNl/zGMFn80/rYHrLHDKil
+8tPwvSW7sor9VALKle2EEKD367G3augwRHC7gn/Ub2JtC1xcPL84g/4fJZpwG+PZ
+q1ZpAD10F6Cm+q/lh59KHV/QnQaB1V0tjFGFLDQRCNHom5PBZa6zhCtkqrn1WIsP
+6EcpUHpWi28YBx3XhTOJrsfwVzYBlRfbDboZ8mdHsYttw2FPTIeELWqDn8OfZ09h
++j6126sBe/8+aOsr+EBnIKNpn+6t6FSkdu4OZgxWTwKCAQEAxjRXWjVeUBgxFfWl
+aYsXcXDW/nP3PrFcG2UR/DkdW1aFYf5MbYssMdRaLFxNEanyQRrJtZsfncQORE11
+mq7cdn3t4XRwvjkq5KA6eDkK3imTs+zQzxOsc8fSm/s0aWCrjs/upGNuK2FVDTD5
+6WraKP6OFE+rQ6ebAxpkU+IUapLTp6wOIhkpLq/1x6OuwtMy/kiqeiiu5aQgkc1v
+Q6aVNn3J+Jzo9EgYbelq/f8NQwcDbz3Cdr5nFqFT35fway7sflm6yUErbz2YEAuF
+ppiv7RH3iXu11fU3Q4n0Yt8ujiyY7nTNFMH7ggbiwrqM1B+fvsvuM9SFemBUczPE
+iH57GwKCAQAdLm1mSeUPn3qEXy/ui7M7GPK43r1l2fn6UJhEGckm4YJ2DOlWisNW
+2ilyzfdlYF1Cq22iKxi3/mZdNojKKL7yFCTwx2evHsSIt2vcyD25sFVh5u9O/xFa
+1Zk3Pzq6XpaAfZCY4OizJb5zraWYWVNAP1DI4qT0Kg6LvNWZ5G9Dh+tptTmB9E05
+5GiBWD3OfWH5AMQ2UmprEivbaqN8Gm/W6m6Hraf+LbP4aFORwElNAZTymeNcW5O5
+ha2XU2TAINmhgPm1IZEGiSah+A+s2uW4Ox4nQJfksy+rtJOPRcnK4aIhURhzwJv/
+8JszrQ2Tq9fN/cO50CDeipqAtKkcWNjE
+-----END PRIVATE KEY-----

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties

@@ -17,5 +17,5 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
-log4j.logger.io.netty=INFO
-log4j.logger.org.apache.hadoop.mapred=INFO
+log4j.logger.io.netty=TRACE
+log4j.logger.org.apache.hadoop.mapred=TRACE

Alguns arquivos não foram mostrados porque muitos arquivos mudaram nesse diff