|
@@ -18,6 +18,23 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.task.reduce;
|
|
|
|
|
|
+import java.io.FilterInputStream;
|
|
|
+
|
|
|
+import java.lang.Void;
|
|
|
+
|
|
|
+import java.net.HttpURLConnection;
|
|
|
+
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.mapred.MapOutputFile;
|
|
|
+import org.apache.hadoop.mapreduce.TaskID;
|
|
|
+
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Rule;
|
|
|
+import org.junit.rules.TestName;
|
|
|
+import static org.junit.Assert.*;
|
|
|
+
|
|
|
import static org.mockito.Matchers.*;
|
|
|
import static org.mockito.Mockito.*;
|
|
|
|
|
@@ -26,7 +43,6 @@ import java.io.ByteArrayOutputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
-import java.net.HttpURLConnection;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
import java.net.URL;
|
|
|
import java.util.ArrayList;
|
|
@@ -37,7 +53,6 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapred.Counters;
|
|
|
-import org.apache.hadoop.mapred.IFileOutputStream;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
@@ -45,69 +60,68 @@ import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
+
|
|
|
/**
|
|
|
* Test that the Fetcher does what we expect it to.
|
|
|
*/
|
|
|
public class TestFetcher {
|
|
|
private static final Log LOG = LogFactory.getLog(TestFetcher.class);
|
|
|
+ JobConf job = null;
|
|
|
+ TaskAttemptID id = null;
|
|
|
+ ShuffleSchedulerImpl<Text, Text> ss = null;
|
|
|
+ MergeManagerImpl<Text, Text> mm = null;
|
|
|
+ Reporter r = null;
|
|
|
+ ShuffleClientMetrics metrics = null;
|
|
|
+ ExceptionReporter except = null;
|
|
|
+ SecretKey key = null;
|
|
|
+ HttpURLConnection connection = null;
|
|
|
+ Counters.Counter allErrs = null;
|
|
|
|
|
|
- public static class FakeFetcher<K,V> extends Fetcher<K,V> {
|
|
|
+ final String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
|
|
|
+ final MapHost host = new MapHost("localhost", "http://localhost:8080/");
|
|
|
+ final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
|
|
|
+ final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
|
|
|
|
|
|
- private HttpURLConnection connection;
|
|
|
+ @Rule public TestName name = new TestName();
|
|
|
|
|
|
- public FakeFetcher(JobConf job, TaskAttemptID reduceId,
|
|
|
- ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger,
|
|
|
- Reporter reporter, ShuffleClientMetrics metrics,
|
|
|
- ExceptionReporter exceptionReporter, SecretKey jobTokenSecret,
|
|
|
- HttpURLConnection connection) {
|
|
|
- super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
|
|
|
- jobTokenSecret);
|
|
|
- this.connection = connection;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected HttpURLConnection openConnection(URL url) throws IOException {
|
|
|
- if(connection != null) {
|
|
|
- return connection;
|
|
|
- }
|
|
|
- return super.openConnection(url);
|
|
|
- }
|
|
|
+ @Before
|
|
|
+ @SuppressWarnings("unchecked") // mocked generics
|
|
|
+ public void setup() {
|
|
|
+ LOG.info(">>>> " + name.getMethodName());
|
|
|
+ job = new JobConf();
|
|
|
+ id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
|
|
+ ss = mock(ShuffleSchedulerImpl.class);
|
|
|
+ mm = mock(MergeManagerImpl.class);
|
|
|
+ r = mock(Reporter.class);
|
|
|
+ metrics = mock(ShuffleClientMetrics.class);
|
|
|
+ except = mock(ExceptionReporter.class);
|
|
|
+ key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
|
|
|
+ connection = mock(HttpURLConnection.class);
|
|
|
+
|
|
|
+ allErrs = mock(Counters.Counter.class);
|
|
|
+ when(r.getCounter(anyString(), anyString())).thenReturn(allErrs);
|
|
|
+
|
|
|
+ ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
|
|
|
+ maps.add(map1ID);
|
|
|
+ maps.add(map2ID);
|
|
|
+ when(ss.getMapsForHost(host)).thenReturn(maps);
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void teardown() {
|
|
|
+ LOG.info("<<<< " + name.getMethodName());
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
@Test(timeout=30000)
|
|
|
public void testCopyFromHostConnectionTimeout() throws Exception {
|
|
|
- LOG.info("testCopyFromHostConnectionTimeout");
|
|
|
- JobConf job = new JobConf();
|
|
|
- TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
|
|
- ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
|
|
|
- MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
|
|
|
- Reporter r = mock(Reporter.class);
|
|
|
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
|
|
- ExceptionReporter except = mock(ExceptionReporter.class);
|
|
|
- SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
|
|
|
- HttpURLConnection connection = mock(HttpURLConnection.class);
|
|
|
when(connection.getInputStream()).thenThrow(
|
|
|
new SocketTimeoutException("This is a fake timeout :)"));
|
|
|
|
|
|
- Counters.Counter allErrs = mock(Counters.Counter.class);
|
|
|
- when(r.getCounter(anyString(), anyString()))
|
|
|
- .thenReturn(allErrs);
|
|
|
-
|
|
|
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
|
|
r, metrics, except, key, connection);
|
|
|
|
|
|
- MapHost host = new MapHost("localhost", "http://localhost:8080/");
|
|
|
-
|
|
|
- ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
|
|
|
- TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
|
|
|
- maps.add(map1ID);
|
|
|
- TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
|
|
|
- maps.add(map2ID);
|
|
|
- when(ss.getMapsForHost(host)).thenReturn(maps);
|
|
|
-
|
|
|
- String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
|
|
|
-
|
|
|
underTest.copyFromHost(host);
|
|
|
|
|
|
verify(connection)
|
|
@@ -122,38 +136,11 @@ public class TestFetcher {
|
|
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
@Test
|
|
|
public void testCopyFromHostBogusHeader() throws Exception {
|
|
|
- LOG.info("testCopyFromHostBogusHeader");
|
|
|
- JobConf job = new JobConf();
|
|
|
- TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
|
|
- ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
|
|
|
- MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
|
|
|
- Reporter r = mock(Reporter.class);
|
|
|
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
|
|
- ExceptionReporter except = mock(ExceptionReporter.class);
|
|
|
- SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
|
|
|
- HttpURLConnection connection = mock(HttpURLConnection.class);
|
|
|
-
|
|
|
- Counters.Counter allErrs = mock(Counters.Counter.class);
|
|
|
- when(r.getCounter(anyString(), anyString()))
|
|
|
- .thenReturn(allErrs);
|
|
|
-
|
|
|
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
|
|
r, metrics, except, key, connection);
|
|
|
-
|
|
|
|
|
|
- MapHost host = new MapHost("localhost", "http://localhost:8080/");
|
|
|
-
|
|
|
- ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
|
|
|
- TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
|
|
|
- maps.add(map1ID);
|
|
|
- TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
|
|
|
- maps.add(map2ID);
|
|
|
- when(ss.getMapsForHost(host)).thenReturn(maps);
|
|
|
-
|
|
|
- String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
|
|
|
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
|
|
|
|
|
when(connection.getResponseCode()).thenReturn(200);
|
|
@@ -177,38 +164,11 @@ public class TestFetcher {
|
|
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
@Test
|
|
|
public void testCopyFromHostWait() throws Exception {
|
|
|
- LOG.info("testCopyFromHostWait");
|
|
|
- JobConf job = new JobConf();
|
|
|
- TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
|
|
- ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
|
|
|
- MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
|
|
|
- Reporter r = mock(Reporter.class);
|
|
|
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
|
|
- ExceptionReporter except = mock(ExceptionReporter.class);
|
|
|
- SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
|
|
|
- HttpURLConnection connection = mock(HttpURLConnection.class);
|
|
|
-
|
|
|
- Counters.Counter allErrs = mock(Counters.Counter.class);
|
|
|
- when(r.getCounter(anyString(), anyString()))
|
|
|
- .thenReturn(allErrs);
|
|
|
-
|
|
|
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
|
|
r, metrics, except, key, connection);
|
|
|
-
|
|
|
|
|
|
- MapHost host = new MapHost("localhost", "http://localhost:8080/");
|
|
|
-
|
|
|
- ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
|
|
|
- TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
|
|
|
- maps.add(map1ID);
|
|
|
- TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
|
|
|
- maps.add(map2ID);
|
|
|
- when(ss.getMapsForHost(host)).thenReturn(maps);
|
|
|
-
|
|
|
- String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
|
|
|
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
|
|
|
|
|
when(connection.getResponseCode()).thenReturn(200);
|
|
@@ -235,112 +195,15 @@ public class TestFetcher {
|
|
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
|
|
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
|
|
}
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- @Test
|
|
|
- public void testCopyFromHostExtraBytes() throws Exception {
|
|
|
- LOG.info("testCopyFromHostWaitExtraBytes");
|
|
|
- JobConf job = new JobConf();
|
|
|
- TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
|
|
- ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
|
|
|
- MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
|
|
|
- InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
|
|
|
-
|
|
|
- Reporter r = mock(Reporter.class);
|
|
|
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
|
|
- ExceptionReporter except = mock(ExceptionReporter.class);
|
|
|
- SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
|
|
|
- HttpURLConnection connection = mock(HttpURLConnection.class);
|
|
|
-
|
|
|
- Counters.Counter allErrs = mock(Counters.Counter.class);
|
|
|
- when(r.getCounter(anyString(), anyString()))
|
|
|
- .thenReturn(allErrs);
|
|
|
-
|
|
|
- Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
|
|
- r, metrics, except, key, connection);
|
|
|
-
|
|
|
- MapHost host = new MapHost("localhost", "http://localhost:8080/");
|
|
|
-
|
|
|
- ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
|
|
|
- TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
|
|
|
- maps.add(map1ID);
|
|
|
- TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
|
|
|
- maps.add(map2ID);
|
|
|
- when(ss.getMapsForHost(host)).thenReturn(maps);
|
|
|
-
|
|
|
- String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
|
|
|
- String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
|
|
-
|
|
|
- when(connection.getResponseCode()).thenReturn(200);
|
|
|
- when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
|
|
|
- .thenReturn(replyHash);
|
|
|
- ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
|
|
|
-
|
|
|
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
|
|
- DataOutputStream dos = new DataOutputStream(bout);
|
|
|
- IFileOutputStream ios = new IFileOutputStream(dos);
|
|
|
- header.write(dos);
|
|
|
- ios.write("MAPDATA123".getBytes());
|
|
|
- ios.finish();
|
|
|
-
|
|
|
- ShuffleHeader header2 = new ShuffleHeader(map2ID.toString(), 14, 10, 1);
|
|
|
- IFileOutputStream ios2 = new IFileOutputStream(dos);
|
|
|
- header2.write(dos);
|
|
|
- ios2.write("MAPDATA456".getBytes());
|
|
|
- ios2.finish();
|
|
|
-
|
|
|
- ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
|
|
|
- when(connection.getInputStream()).thenReturn(in);
|
|
|
- // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream
|
|
|
- InMemoryMapOutput<Text, Text> mapOut = new InMemoryMapOutput<Text, Text>(job, map1ID, mm, 8, null, true );
|
|
|
- InMemoryMapOutput<Text, Text> mapOut2 = new InMemoryMapOutput<Text, Text>(job, map2ID, mm, 10, null, true );
|
|
|
-
|
|
|
- when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut);
|
|
|
- when(mm.reserve(eq(map2ID), anyLong(), anyInt())).thenReturn(mapOut2);
|
|
|
-
|
|
|
-
|
|
|
- underTest.copyFromHost(host);
|
|
|
-
|
|
|
-
|
|
|
- verify(allErrs).increment(1);
|
|
|
- verify(ss).copyFailed(map1ID, host, true, false);
|
|
|
- verify(ss, never()).copyFailed(map2ID, host, true, false);
|
|
|
-
|
|
|
- verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
|
|
|
- verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
|
|
- }
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@Test(timeout=10000)
|
|
|
public void testCopyFromHostCompressFailure() throws Exception {
|
|
|
- LOG.info("testCopyFromHostCompressFailure");
|
|
|
- JobConf job = new JobConf();
|
|
|
- TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
|
|
|
- ShuffleSchedulerImpl<Text, Text> ss = mock(ShuffleSchedulerImpl.class);
|
|
|
- MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
|
|
|
InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
|
|
|
- Reporter r = mock(Reporter.class);
|
|
|
- ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
|
|
|
- ExceptionReporter except = mock(ExceptionReporter.class);
|
|
|
- SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
|
|
|
- HttpURLConnection connection = mock(HttpURLConnection.class);
|
|
|
-
|
|
|
- Counters.Counter allErrs = mock(Counters.Counter.class);
|
|
|
- when(r.getCounter(anyString(), anyString()))
|
|
|
- .thenReturn(allErrs);
|
|
|
-
|
|
|
+
|
|
|
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
|
|
r, metrics, except, key, connection);
|
|
|
-
|
|
|
|
|
|
- MapHost host = new MapHost("localhost", "http://localhost:8080/");
|
|
|
-
|
|
|
- ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
|
|
|
- TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
|
|
|
- maps.add(map1ID);
|
|
|
- TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
|
|
|
- maps.add(map2ID);
|
|
|
- when(ss.getMapsForHost(host)).thenReturn(maps);
|
|
|
- String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
|
|
|
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
|
|
|
|
|
when(connection.getResponseCode()).thenReturn(200);
|
|
@@ -366,4 +229,191 @@ public class TestFetcher {
|
|
|
encHash);
|
|
|
verify(ss, times(1)).copyFailed(map1ID, host, true, false);
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testInterruptInMemory() throws Exception {
|
|
|
+ final int FETCHER = 2;
|
|
|
+ InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
|
|
|
+ job, id, mm, 100, null, true));
|
|
|
+ when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
|
|
|
+ .thenReturn(immo);
|
|
|
+ doNothing().when(mm).waitForResource();
|
|
|
+ when(ss.getHost()).thenReturn(host);
|
|
|
+
|
|
|
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
|
|
+ when(connection.getResponseCode()).thenReturn(200);
|
|
|
+ when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
|
|
|
+ .thenReturn(replyHash);
|
|
|
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
|
|
|
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
|
|
+ header.write(new DataOutputStream(bout));
|
|
|
+ final StuckInputStream in =
|
|
|
+ new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
|
|
|
+ when(connection.getInputStream()).thenReturn(in);
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
+ public Void answer(InvocationOnMock ignore) throws IOException {
|
|
|
+ in.close();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(connection).disconnect();
|
|
|
+
|
|
|
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
|
|
+ r, metrics, except, key, connection, FETCHER);
|
|
|
+ underTest.start();
|
|
|
+ // wait for read in inputstream
|
|
|
+ in.waitForFetcher();
|
|
|
+ underTest.shutDown();
|
|
|
+ underTest.join(); // rely on test timeout to kill if stuck
|
|
|
+
|
|
|
+ assertTrue(in.wasClosedProperly());
|
|
|
+ verify(immo).abort();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testInterruptOnDisk() throws Exception {
|
|
|
+ final int FETCHER = 7;
|
|
|
+ Path p = new Path("file:///tmp/foo");
|
|
|
+ Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER);
|
|
|
+ FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS);
|
|
|
+ MapOutputFile mof = mock(MapOutputFile.class);
|
|
|
+ when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p);
|
|
|
+ OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID,
|
|
|
+ id, mm, 100L, job, mof, FETCHER, true, mFs, p));
|
|
|
+ when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
|
|
|
+ .thenReturn(odmo);
|
|
|
+ doNothing().when(mm).waitForResource();
|
|
|
+ when(ss.getHost()).thenReturn(host);
|
|
|
+
|
|
|
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
|
|
+ when(connection.getResponseCode()).thenReturn(200);
|
|
|
+ when(connection.getHeaderField(
|
|
|
+ SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
|
|
|
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
|
|
|
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
|
|
+ header.write(new DataOutputStream(bout));
|
|
|
+ final StuckInputStream in =
|
|
|
+ new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
|
|
|
+ when(connection.getInputStream()).thenReturn(in);
|
|
|
+ doAnswer(new Answer<Void>() {
|
|
|
+ public Void answer(InvocationOnMock ignore) throws IOException {
|
|
|
+ in.close();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }).when(connection).disconnect();
|
|
|
+
|
|
|
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
|
|
+ r, metrics, except, key, connection, FETCHER);
|
|
|
+ underTest.start();
|
|
|
+ // wait for read in inputstream
|
|
|
+ in.waitForFetcher();
|
|
|
+ underTest.shutDown();
|
|
|
+ underTest.join(); // rely on test timeout to kill if stuck
|
|
|
+
|
|
|
+ assertTrue(in.wasClosedProperly());
|
|
|
+ verify(mFs).create(eq(pTmp));
|
|
|
+ verify(mFs).delete(eq(pTmp), eq(false));
|
|
|
+ verify(odmo).abort();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class FakeFetcher<K,V> extends Fetcher<K,V> {
|
|
|
+
|
|
|
+ public FakeFetcher(JobConf job, TaskAttemptID reduceId,
|
|
|
+ ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger,
|
|
|
+ Reporter reporter, ShuffleClientMetrics metrics,
|
|
|
+ ExceptionReporter exceptionReporter, SecretKey jobTokenSecret,
|
|
|
+ HttpURLConnection connection) {
|
|
|
+ super(job, reduceId, scheduler, merger, reporter, metrics,
|
|
|
+ exceptionReporter, jobTokenSecret);
|
|
|
+ this.connection = connection;
|
|
|
+ }
|
|
|
+
|
|
|
+ public FakeFetcher(JobConf job, TaskAttemptID reduceId,
|
|
|
+ ShuffleSchedulerImpl<K,V> scheduler, MergeManagerImpl<K,V> merger,
|
|
|
+ Reporter reporter, ShuffleClientMetrics metrics,
|
|
|
+ ExceptionReporter exceptionReporter, SecretKey jobTokenSecret,
|
|
|
+ HttpURLConnection connection, int id) {
|
|
|
+ super(job, reduceId, scheduler, merger, reporter, metrics,
|
|
|
+ exceptionReporter, jobTokenSecret, id);
|
|
|
+ this.connection = connection;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void openConnection(URL url) throws IOException {
|
|
|
+ if (null == connection) {
|
|
|
+ super.openConnection(url);
|
|
|
+ }
|
|
|
+ // already 'opened' the mocked connection
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ static class StuckInputStream extends FilterInputStream {
|
|
|
+
|
|
|
+ boolean stuck = false;
|
|
|
+ volatile boolean closed = false;
|
|
|
+
|
|
|
+ StuckInputStream(InputStream inner) {
|
|
|
+ super(inner);
|
|
|
+ }
|
|
|
+
|
|
|
+ int freeze() throws IOException {
|
|
|
+ synchronized (this) {
|
|
|
+ stuck = true;
|
|
|
+ notify();
|
|
|
+ }
|
|
|
+ // connection doesn't throw InterruptedException, but may return some
|
|
|
+ // bytes geq 0 or throw an exception
|
|
|
+ while (!Thread.currentThread().isInterrupted() || closed) {
|
|
|
+ // spin
|
|
|
+ if (closed) {
|
|
|
+ throw new IOException("underlying stream closed, triggered an error");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read() throws IOException {
|
|
|
+ int ret = super.read();
|
|
|
+ if (ret != -1) {
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ return freeze();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read(byte[] b) throws IOException {
|
|
|
+ int ret = super.read(b);
|
|
|
+ if (ret != -1) {
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ return freeze();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int read(byte[] b, int off, int len) throws IOException {
|
|
|
+ int ret = super.read(b, off, len);
|
|
|
+ if (ret != -1) {
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ return freeze();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ closed = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized void waitForFetcher() throws InterruptedException {
|
|
|
+ while (!stuck) {
|
|
|
+ wait();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean wasClosedProperly() {
|
|
|
+ return closed;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
}
|