|
@@ -104,6 +104,7 @@ import org.jboss.netty.channel.Channel;
|
|
|
import org.jboss.netty.channel.ChannelFactory;
|
|
|
import org.jboss.netty.channel.ChannelFuture;
|
|
|
import org.jboss.netty.channel.ChannelFutureListener;
|
|
|
+import org.jboss.netty.channel.ChannelHandler;
|
|
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
|
import org.jboss.netty.channel.ChannelPipeline;
|
|
|
import org.jboss.netty.channel.ChannelPipelineFactory;
|
|
@@ -126,7 +127,13 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
|
|
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
|
|
|
import org.jboss.netty.handler.ssl.SslHandler;
|
|
|
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
|
|
|
+import org.jboss.netty.handler.timeout.IdleState;
|
|
|
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
|
|
|
+import org.jboss.netty.handler.timeout.IdleStateEvent;
|
|
|
+import org.jboss.netty.handler.timeout.IdleStateHandler;
|
|
|
import org.jboss.netty.util.CharsetUtil;
|
|
|
+import org.jboss.netty.util.HashedWheelTimer;
|
|
|
+import org.jboss.netty.util.Timer;
|
|
|
import org.eclipse.jetty.http.HttpHeader;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
@@ -240,6 +247,7 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
|
|
|
public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
|
|
|
false;
|
|
|
+ private static final String TIMEOUT_HANDLER = "timeout";
|
|
|
|
|
|
/* the maximum number of files a single GET request can
|
|
|
open simultaneously during shuffle
|
|
@@ -249,8 +257,9 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
|
|
|
|
|
|
boolean connectionKeepAliveEnabled = false;
|
|
|
- int connectionKeepAliveTimeOut;
|
|
|
- int mapOutputMetaInfoCacheSize;
|
|
|
+ private int connectionKeepAliveTimeOut;
|
|
|
+ private int mapOutputMetaInfoCacheSize;
|
|
|
+ private Timer timer;
|
|
|
|
|
|
@Metrics(about="Shuffle output metrics", context="mapred")
|
|
|
static class ShuffleMetrics implements ChannelFutureListener {
|
|
@@ -293,7 +302,15 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
|
|
|
if (waitCount == 0) {
|
|
|
metrics.operationComplete(future);
|
|
|
- future.getChannel().close();
|
|
|
+ // Let the idle timer handler close keep-alive connections
|
|
|
+ if (reduceContext.getKeepAlive()) {
|
|
|
+ ChannelPipeline pipeline = future.getChannel().getPipeline();
|
|
|
+ TimeoutHandler timeoutHandler =
|
|
|
+ (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
|
|
|
+ timeoutHandler.setEnabledTimeout(true);
|
|
|
+ } else {
|
|
|
+ future.getChannel().close();
|
|
|
+ }
|
|
|
} else {
|
|
|
pipelineFact.getSHUFFLE().sendMap(reduceContext);
|
|
|
}
|
|
@@ -314,11 +331,12 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
private String user;
|
|
|
private Map<String, Shuffle.MapOutputInfo> infoMap;
|
|
|
private String jobId;
|
|
|
+ private final boolean keepAlive;
|
|
|
|
|
|
public ReduceContext(List<String> mapIds, int rId,
|
|
|
ChannelHandlerContext context, String usr,
|
|
|
Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
|
|
|
- String jobId) {
|
|
|
+ String jobId, boolean keepAlive) {
|
|
|
|
|
|
this.mapIds = mapIds;
|
|
|
this.reduceId = rId;
|
|
@@ -339,6 +357,7 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
this.user = usr;
|
|
|
this.infoMap = mapOutputInfoMap;
|
|
|
this.jobId = jobId;
|
|
|
+ this.keepAlive = keepAlive;
|
|
|
}
|
|
|
|
|
|
public int getReduceId() {
|
|
@@ -372,6 +391,10 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
public AtomicInteger getMapsToWait() {
|
|
|
return mapsToWait;
|
|
|
}
|
|
|
+
|
|
|
+ public boolean getKeepAlive() {
|
|
|
+ return keepAlive;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
ShuffleHandler(MetricsSystem ms) {
|
|
@@ -508,8 +531,10 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
secretManager = new JobTokenSecretManager();
|
|
|
recoverState(conf);
|
|
|
ServerBootstrap bootstrap = new ServerBootstrap(selector);
|
|
|
+ // Timer is shared across entire factory and must be released separately
|
|
|
+ timer = new HashedWheelTimer();
|
|
|
try {
|
|
|
- pipelineFact = new HttpPipelineFactory(conf);
|
|
|
+ pipelineFact = new HttpPipelineFactory(conf, timer);
|
|
|
} catch (Exception ex) {
|
|
|
throw new RuntimeException(ex);
|
|
|
}
|
|
@@ -549,6 +574,10 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
if (pipelineFact != null) {
|
|
|
pipelineFact.destroy();
|
|
|
}
|
|
|
+ if (timer != null) {
|
|
|
+ // Release this shared timer resource
|
|
|
+ timer.stop();
|
|
|
+ }
|
|
|
if (stateDb != null) {
|
|
|
stateDb.close();
|
|
|
}
|
|
@@ -755,12 +784,29 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static class TimeoutHandler extends IdleStateAwareChannelHandler {
|
|
|
+
|
|
|
+ private boolean enabledTimeout;
|
|
|
+
|
|
|
+ void setEnabledTimeout(boolean enabledTimeout) {
|
|
|
+ this.enabledTimeout = enabledTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
|
|
|
+ if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
|
|
|
+ e.getChannel().close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
class HttpPipelineFactory implements ChannelPipelineFactory {
|
|
|
|
|
|
final Shuffle SHUFFLE;
|
|
|
private SSLFactory sslFactory;
|
|
|
+ private final ChannelHandler idleStateHandler;
|
|
|
|
|
|
- public HttpPipelineFactory(Configuration conf) throws Exception {
|
|
|
+ public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
|
|
|
SHUFFLE = getShuffle(conf);
|
|
|
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
|
|
|
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
|
|
@@ -768,6 +814,7 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
|
|
|
sslFactory.init();
|
|
|
}
|
|
|
+ this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0);
|
|
|
}
|
|
|
|
|
|
public Shuffle getSHUFFLE() {
|
|
@@ -791,6 +838,8 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
pipeline.addLast("encoder", new HttpResponseEncoder());
|
|
|
pipeline.addLast("chunking", new ChunkedWriteHandler());
|
|
|
pipeline.addLast("shuffle", SHUFFLE);
|
|
|
+ pipeline.addLast("idle", idleStateHandler);
|
|
|
+ pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
|
|
|
return pipeline;
|
|
|
// TODO factor security manager into pipeline
|
|
|
// TODO factor out encode/decode to permit binary shuffle
|
|
@@ -981,6 +1030,10 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
Map<String, MapOutputInfo> mapOutputInfoMap =
|
|
|
new HashMap<String, MapOutputInfo>();
|
|
|
Channel ch = evt.getChannel();
|
|
|
+ ChannelPipeline pipeline = ch.getPipeline();
|
|
|
+ TimeoutHandler timeoutHandler =
|
|
|
+ (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
|
|
|
+ timeoutHandler.setEnabledTimeout(false);
|
|
|
String user = userRsrc.get(jobId);
|
|
|
|
|
|
try {
|
|
@@ -995,8 +1048,9 @@ public class ShuffleHandler extends AuxiliaryService {
|
|
|
}
|
|
|
ch.write(response);
|
|
|
//Initialize one ReduceContext object per messageReceived call
|
|
|
+ boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
|
|
|
ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
|
|
|
- user, mapOutputInfoMap, jobId);
|
|
|
+ user, mapOutputInfoMap, jobId, keepAlive);
|
|
|
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
|
|
|
ChannelFuture nextMap = sendMap(reduceContext);
|
|
|
if(nextMap == null) {
|