|
@@ -80,6 +80,8 @@ import org.apache.hadoop.yarn.server.records.Version;
|
|
|
import org.jboss.netty.channel.Channel;
|
|
|
import org.jboss.netty.channel.ChannelFuture;
|
|
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
|
+import org.jboss.netty.channel.ChannelStateEvent;
|
|
|
+import org.jboss.netty.channel.socket.SocketChannel;
|
|
|
import org.jboss.netty.channel.MessageEvent;
|
|
|
import org.jboss.netty.channel.AbstractChannel;
|
|
|
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
|
|
@@ -140,6 +142,27 @@ public class TestShuffleHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler {
|
|
|
+ boolean socketKeepAlive = false;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Shuffle getShuffle(final Configuration conf) {
|
|
|
+ return new Shuffle(conf) {
|
|
|
+ @Override
|
|
|
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
|
|
+ HttpRequest request, HttpResponse response, URL requestUri)
|
|
|
+ throws IOException {
|
|
|
+ SocketChannel channel = (SocketChannel)(ctx.getChannel());
|
|
|
+ socketKeepAlive = channel.getConfig().isKeepAlive();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ protected boolean isSocketKeepAlive() {
|
|
|
+ return socketKeepAlive;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test the validation of ShuffleHandler's meta-data's serialization and
|
|
|
* de-serialization.
|
|
@@ -423,6 +446,42 @@ public class TestShuffleHandler {
|
|
|
input.close();
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 10000)
|
|
|
+ public void testSocketKeepAlive() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
|
|
+ conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
|
|
|
+ // try setting to -ve keep alive timeout.
|
|
|
+ conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
|
|
|
+ HttpURLConnection conn = null;
|
|
|
+ MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
|
|
|
+ try {
|
|
|
+ shuffleHandler.init(conf);
|
|
|
+ shuffleHandler.start();
|
|
|
+
|
|
|
+ String shuffleBaseURL = "http://127.0.0.1:"
|
|
|
+ + shuffleHandler.getConfig().get(
|
|
|
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
|
|
|
+ URL url =
|
|
|
+ new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
|
|
|
+ + "map=attempt_12345_1_m_1_0");
|
|
|
+ conn = (HttpURLConnection) url.openConnection();
|
|
|
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
|
|
|
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
|
|
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
|
|
|
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
|
|
+ conn.connect();
|
|
|
+ conn.getInputStream();
|
|
|
+ Assert.assertTrue("socket should be set KEEP_ALIVE",
|
|
|
+ shuffleHandler.isSocketKeepAlive());
|
|
|
+ } finally {
|
|
|
+ if (conn != null) {
|
|
|
+ conn.disconnect();
|
|
|
+ }
|
|
|
+ shuffleHandler.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
|
|
|
* header_name and sometimes a wrong version
|