|
@@ -24,6 +24,7 @@ import java.lang.Void;
|
|
|
|
|
|
import java.net.HttpURLConnection;
|
|
|
|
|
|
+import org.apache.hadoop.fs.ChecksumException;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.mapred.MapOutputFile;
|
|
@@ -54,6 +55,7 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapred.Counters;
|
|
|
+import org.apache.hadoop.mapred.IFileInputStream;
|
|
|
import org.apache.hadoop.mapred.IFileOutputStream;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
@@ -88,6 +90,7 @@ public class TestFetcher {
|
|
|
final MapHost host = new MapHost("localhost", "http://localhost:8080/");
|
|
|
final TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
|
|
|
final TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
|
|
|
+ FileSystem fs = null;
|
|
|
|
|
|
@Rule public TestName name = new TestName();
|
|
|
|
|
@@ -118,8 +121,11 @@ public class TestFetcher {
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
- public void teardown() {
|
|
|
+ public void teardown() throws IllegalArgumentException, IOException {
|
|
|
LOG.info("<<<< " + name.getMethodName());
|
|
|
+ if (fs != null) {
|
|
|
+ fs.delete(new Path(name.getMethodName()),true);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -432,6 +438,70 @@ public class TestFetcher {
|
|
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testCorruptedIFile() throws Exception {
|
|
|
+ final int fetcher = 7;
|
|
|
+ Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
|
|
|
+ Path shuffledToDisk =
|
|
|
+ OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
|
|
|
+ fs = FileSystem.getLocal(job).getRaw();
|
|
|
+ MapOutputFile mof = mock(MapOutputFile.class);
|
|
|
+ OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
|
|
|
+ id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
|
|
|
+
|
|
|
+ String mapData = "MAPDATA12345678901234567890";
|
|
|
+
|
|
|
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
|
|
|
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
|
|
+ DataOutputStream dos = new DataOutputStream(bout);
|
|
|
+ IFileOutputStream ios = new IFileOutputStream(dos);
|
|
|
+ header.write(dos);
|
|
|
+
|
|
|
+ int headerSize = dos.size();
|
|
|
+ try {
|
|
|
+ ios.write(mapData.getBytes());
|
|
|
+ } finally {
|
|
|
+ ios.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ int dataSize = bout.size() - headerSize;
|
|
|
+
|
|
|
+ // Ensure that the OnDiskMapOutput shuffler can successfully read the data.
|
|
|
+ MapHost host = new MapHost("TestHost", "http://test/url");
|
|
|
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
|
|
|
+ try {
|
|
|
+ // Read past the shuffle header.
|
|
|
+ bin.read(new byte[headerSize], 0, headerSize);
|
|
|
+ odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
|
|
|
+ } finally {
|
|
|
+ bin.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now corrupt the IFile data.
|
|
|
+ byte[] corrupted = bout.toByteArray();
|
|
|
+ corrupted[headerSize + (dataSize / 2)] = 0x0;
|
|
|
+
|
|
|
+ try {
|
|
|
+ bin = new ByteArrayInputStream(corrupted);
|
|
|
+ // Read past the shuffle header.
|
|
|
+ bin.read(new byte[headerSize], 0, headerSize);
|
|
|
+ odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
|
|
|
+ fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
|
|
|
+ } catch(ChecksumException e) {
|
|
|
+ LOG.info("The expected checksum exception was thrown.", e);
|
|
|
+ } finally {
|
|
|
+ bin.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Ensure that the shuffled file can be read.
|
|
|
+ IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
|
|
|
+ try {
|
|
|
+ iFin.read(new byte[dataSize], 0, dataSize);
|
|
|
+ } finally {
|
|
|
+ iFin.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout=10000)
|
|
|
public void testInterruptInMemory() throws Exception {
|
|
|
final int FETCHER = 2;
|