|
@@ -28,9 +28,12 @@ import static org.junit.Assert.assertEquals;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.net.HttpURLConnection;
|
|
|
+import java.net.SocketException;
|
|
|
import java.net.URL;
|
|
|
import java.util.ArrayList;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
|
|
@@ -49,8 +52,10 @@ import org.junit.Test;
|
|
|
|
|
|
public class TestShuffleHandler {
|
|
|
static final long MiB = 1024 * 1024;
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
|
|
|
|
|
|
- @Test public void testSerializeMeta() throws Exception {
|
|
|
+ @Test(timeout = 10000)
|
|
|
+ public void testSerializeMeta() throws Exception {
|
|
|
assertEquals(1, ShuffleHandler.deserializeMetaData(
|
|
|
ShuffleHandler.serializeMetaData(1)));
|
|
|
assertEquals(-1, ShuffleHandler.deserializeMetaData(
|
|
@@ -59,7 +64,8 @@ public class TestShuffleHandler {
|
|
|
ShuffleHandler.serializeMetaData(8080)));
|
|
|
}
|
|
|
|
|
|
- @Test public void testShuffleMetrics() throws Exception {
|
|
|
+ @Test(timeout = 10000)
|
|
|
+ public void testShuffleMetrics() throws Exception {
|
|
|
MetricsSystem ms = new MetricsSystemImpl();
|
|
|
ShuffleHandler sh = new ShuffleHandler(ms);
|
|
|
ChannelFuture cf = make(stub(ChannelFuture.class).
|
|
@@ -88,7 +94,7 @@ public class TestShuffleHandler {
|
|
|
assertGauge("ShuffleConnections", connections, rb);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout = 10000)
|
|
|
public void testClientClosesConnection() throws Exception {
|
|
|
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
|
|
|
Configuration conf = new Configuration();
|
|
@@ -103,6 +109,7 @@ public class TestShuffleHandler {
|
|
|
HttpRequest request, HttpResponse response, URL requestUri)
|
|
|
throws IOException {
|
|
|
}
|
|
|
+
|
|
|
@Override
|
|
|
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
|
|
Channel ch, String user, String jobId, String mapId, int reduce)
|
|
@@ -120,6 +127,7 @@ public class TestShuffleHandler {
|
|
|
}
|
|
|
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
|
|
}
|
|
|
+
|
|
|
@Override
|
|
|
protected void sendError(ChannelHandlerContext ctx,
|
|
|
HttpResponseStatus status) {
|
|
@@ -128,6 +136,7 @@ public class TestShuffleHandler {
|
|
|
ctx.getChannel().close();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
@Override
|
|
|
protected void sendError(ChannelHandlerContext ctx, String message,
|
|
|
HttpResponseStatus status) {
|
|
@@ -159,4 +168,82 @@ public class TestShuffleHandler {
|
|
|
Assert.assertTrue("sendError called when client closed connection",
|
|
|
failures.size() == 0);
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 10000)
|
|
|
+ public void testMaxConnections() throws Exception {
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
|
|
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
|
|
|
+
|
|
|
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
|
|
|
+ @Override
|
|
|
+ protected Shuffle getShuffle(Configuration conf) {
|
|
|
+ // replace the shuffle handler with one stubbed for testing
|
|
|
+ return new Shuffle(conf) {
|
|
|
+ @Override
|
|
|
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
|
|
+ HttpRequest request, HttpResponse response, URL requestUri)
|
|
|
+ throws IOException {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
|
|
+ Channel ch, String user, String jobId, String mapId, int reduce)
|
|
|
+ throws IOException {
|
|
|
+ // send a shuffle header and a lot of data down the channel
|
|
|
+ // to trigger a broken pipe
|
|
|
+ ShuffleHeader header = new ShuffleHeader("dummy_header", 5678,
|
|
|
+ 5678, 1);
|
|
|
+ DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
+ header.write(dob);
|
|
|
+ ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
|
|
+ dob = new DataOutputBuffer();
|
|
|
+ for (int i = 0; i < 100000; ++i) {
|
|
|
+ header.write(dob);
|
|
|
+ }
|
|
|
+ return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
+ shuffleHandler.init(conf);
|
|
|
+ shuffleHandler.start();
|
|
|
+ // setup connections
|
|
|
+ Integer connAttempts = 3;
|
|
|
+ HttpURLConnection conns[] = new HttpURLConnection[connAttempts];
|
|
|
+ for (Integer i = 0; i < connAttempts; i++) {
|
|
|
+ String URLstring = "http://127.0.0.1:"
|
|
|
+ + shuffleHandler.getConfig().get(
|
|
|
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
|
|
|
+ + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_"
|
|
|
+ + i.toString() + "_0";
|
|
|
+ URL url = new URL(URLstring);
|
|
|
+ conns[i] = (HttpURLConnection) url.openConnection();
|
|
|
+ }
|
|
|
+ // Try to open numerous connections
|
|
|
+ for (Integer i = 0; i < connAttempts; i++) {
|
|
|
+ conns[i].connect();
|
|
|
+ }
|
|
|
+ //Check that first 2 connection are okay
|
|
|
+ conns[0].getInputStream();
|
|
|
+ int rc = conns[0].getResponseCode();
|
|
|
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
|
|
+ Thread.sleep(100);
|
|
|
+ conns[1].getInputStream();
|
|
|
+ rc = conns[1].getResponseCode();
|
|
|
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
|
|
+
|
|
|
+ // This connection should be closed because it to above the limit
|
|
|
+ try {
|
|
|
+ conns[2].getInputStream();
|
|
|
+ rc = conns[2].getResponseCode();
|
|
|
+ Assert.fail("Expected a SocketException");
|
|
|
+ } catch (SocketException se) {
|
|
|
+ LOG.info("Expected - connection should not be open");
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.fail("Expected a SocketException");
|
|
|
+ }
|
|
|
+ shuffleHandler.stop();
|
|
|
+ }
|
|
|
}
|