|
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
import javax.crypto.SecretKey;
|
|
@@ -170,6 +171,7 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
private int maxShuffleConnections;
|
|
|
private int shuffleBufferSize;
|
|
|
private boolean shuffleTransferToAllowed;
|
|
|
+ private int maxSessionOpenFiles;
|
|
|
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
|
|
|
|
|
|
private Map<String,String> userRsrc;
|
|
@@ -220,6 +222,13 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
|
|
|
false;
|
|
|
|
|
|
+ /* the maximum number of files a single GET request can
|
|
|
+ open simultaneously during shuffle
|
|
|
+ */
|
|
|
+ public static final String SHUFFLE_MAX_SESSION_OPEN_FILES =
|
|
|
+ "mapreduce.shuffle.max.session-open-files";
|
|
|
+ public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
|
|
|
+
|
|
|
boolean connectionKeepAliveEnabled = false;
|
|
|
int connectionKeepAliveTimeOut;
|
|
|
int mapOutputMetaInfoCacheSize;
|
|
@@ -248,6 +257,104 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
|
|
|
final ShuffleMetrics metrics;
|
|
|
|
|
|
+ class ReduceMapFileCount implements ChannelFutureListener {
|
|
|
+
|
|
|
+ private ReduceContext reduceContext;
|
|
|
+
|
|
|
+ public ReduceMapFileCount(ReduceContext rc) {
|
|
|
+ this.reduceContext = rc;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void operationComplete(ChannelFuture future) throws Exception {
|
|
|
+ if (!future.isSuccess()) {
|
|
|
+ future.getChannel().close();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
|
|
|
+ if (waitCount == 0) {
|
|
|
+ metrics.operationComplete(future);
|
|
|
+ future.getChannel().close();
|
|
|
+ } else {
|
|
|
+ pipelineFact.getSHUFFLE().sendMap(reduceContext);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Maintain parameters per messageReceived() Netty context.
|
|
|
+ * Allows sendMapOutput calls from operationComplete()
|
|
|
+ */
|
|
|
+ private static class ReduceContext {
|
|
|
+
|
|
|
+ private List<String> mapIds;
|
|
|
+ private AtomicInteger mapsToWait;
|
|
|
+ private AtomicInteger mapsToSend;
|
|
|
+ private int reduceId;
|
|
|
+ private ChannelHandlerContext ctx;
|
|
|
+ private String user;
|
|
|
+ private Map<String, Shuffle.MapOutputInfo> infoMap;
|
|
|
+ private String outputBasePathStr;
|
|
|
+
|
|
|
+ public ReduceContext(List<String> mapIds, int rId,
|
|
|
+ ChannelHandlerContext context, String usr,
|
|
|
+ Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
|
|
|
+ String outputBasePath) {
|
|
|
+
|
|
|
+ this.mapIds = mapIds;
|
|
|
+ this.reduceId = rId;
|
|
|
+ /**
|
|
|
+ * Atomic count for tracking the no. of map outputs that are yet to
|
|
|
+ * complete. Multiple futureListeners' operationComplete() can decrement
|
|
|
+ * this value asynchronously. It is used to decide when the channel should
|
|
|
+ * be closed.
|
|
|
+ */
|
|
|
+ this.mapsToWait = new AtomicInteger(mapIds.size());
|
|
|
+ /**
|
|
|
+ * Atomic count for tracking the no. of map outputs that have been sent.
|
|
|
+ * Multiple sendMap() calls can increment this value
|
|
|
+ * asynchronously. Used to decide which mapId should be sent next.
|
|
|
+ */
|
|
|
+ this.mapsToSend = new AtomicInteger(0);
|
|
|
+ this.ctx = context;
|
|
|
+ this.user = usr;
|
|
|
+ this.infoMap = mapOutputInfoMap;
|
|
|
+ this.outputBasePathStr = outputBasePath;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getReduceId() {
|
|
|
+ return reduceId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public ChannelHandlerContext getCtx() {
|
|
|
+ return ctx;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getUser() {
|
|
|
+ return user;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
|
|
|
+ return infoMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getOutputBasePathStr() {
|
|
|
+ return outputBasePathStr;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<String> getMapIds() {
|
|
|
+ return mapIds;
|
|
|
+ }
|
|
|
+
|
|
|
+ public AtomicInteger getMapsToSend() {
|
|
|
+ return mapsToSend;
|
|
|
+ }
|
|
|
+
|
|
|
+ public AtomicInteger getMapsToWait() {
|
|
|
+ return mapsToWait;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
ShuffleHandler(MetricsSystem ms) {
|
|
|
super("httpshuffle");
|
|
|
metrics = ms.register(new ShuffleMetrics());
|
|
@@ -357,6 +464,9 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
(Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
|
|
|
DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
|
|
|
|
|
|
+ maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
|
|
|
+ DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
|
|
|
+
|
|
|
ThreadFactory bossFactory = new ThreadFactoryBuilder()
|
|
|
.setNameFormat("ShuffleHandler Netty Boss #%d")
|
|
|
.build();
|
|
@@ -638,6 +748,10 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public Shuffle getSHUFFLE() {
|
|
|
+ return SHUFFLE;
|
|
|
+ }
|
|
|
+
|
|
|
public void destroy() {
|
|
|
if (sslFactory != null) {
|
|
|
sslFactory.destroy();
|
|
@@ -809,31 +923,62 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
return;
|
|
|
}
|
|
|
ch.write(response);
|
|
|
- // TODO refactor the following into the pipeline
|
|
|
- ChannelFuture lastMap = null;
|
|
|
- for (String mapId : mapIds) {
|
|
|
+ //Initialize one ReduceContext object per messageReceived call
|
|
|
+ ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
|
|
|
+ user, mapOutputInfoMap, outputBasePathStr);
|
|
|
+ for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
|
|
|
+ ChannelFuture nextMap = sendMap(reduceContext);
|
|
|
+ if(nextMap == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
|
|
|
+ * and increments it. This method is first called by messageReceived()
|
|
|
+ * maxSessionOpenFiles times and then on the completion of every
|
|
|
+ * sendMapOutput operation. This limits the number of open files on a node,
|
|
|
+ * which can get really large(exhausting file descriptors on the NM) if all
|
|
|
+ * sendMapOutputs are called in one go, as was done previous to this change.
|
|
|
+ * @param reduceContext used to call sendMapOutput with correct params.
|
|
|
+ * @return the ChannelFuture of the sendMapOutput, can be null.
|
|
|
+ */
|
|
|
+ public ChannelFuture sendMap(ReduceContext reduceContext)
|
|
|
+ throws Exception {
|
|
|
+
|
|
|
+ ChannelFuture nextMap = null;
|
|
|
+ if (reduceContext.getMapsToSend().get() <
|
|
|
+ reduceContext.getMapIds().size()) {
|
|
|
+ int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
|
|
|
+ String mapId = reduceContext.getMapIds().get(nextIndex);
|
|
|
+
|
|
|
try {
|
|
|
- MapOutputInfo info = mapOutputInfoMap.get(mapId);
|
|
|
+ MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
|
|
|
if (info == null) {
|
|
|
- info = getMapOutputInfo(outputBasePathStr + mapId,
|
|
|
- mapId, reduceId, user);
|
|
|
+ info = getMapOutputInfo(reduceContext.getOutputBasePathStr() +
|
|
|
+ mapId, mapId, reduceContext.getReduceId(),
|
|
|
+ reduceContext.getUser());
|
|
|
}
|
|
|
- lastMap =
|
|
|
- sendMapOutput(ctx, ch, user, mapId,
|
|
|
- reduceId, info);
|
|
|
- if (null == lastMap) {
|
|
|
- sendError(ctx, NOT_FOUND);
|
|
|
- return;
|
|
|
+ nextMap = sendMapOutput(
|
|
|
+ reduceContext.getCtx(),
|
|
|
+ reduceContext.getCtx().getChannel(),
|
|
|
+ reduceContext.getUser(), mapId,
|
|
|
+ reduceContext.getReduceId(), info);
|
|
|
+ if (null == nextMap) {
|
|
|
+ sendError(reduceContext.getCtx(), NOT_FOUND);
|
|
|
+ return null;
|
|
|
}
|
|
|
+ nextMap.addListener(new ReduceMapFileCount(reduceContext));
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Shuffle error :", e);
|
|
|
String errorMessage = getErrorMessage(e);
|
|
|
- sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
|
|
|
- return;
|
|
|
+ sendError(reduceContext.getCtx(), errorMessage,
|
|
|
+ INTERNAL_SERVER_ERROR);
|
|
|
+ return null;
|
|
|
}
|
|
|
}
|
|
|
- lastMap.addListener(metrics);
|
|
|
- lastMap.addListener(ChannelFutureListener.CLOSE);
|
|
|
+ return nextMap;
|
|
|
}
|
|
|
|
|
|
private String getErrorMessage(Throwable t) {
|