|
@@ -81,10 +81,11 @@ public class DistributedFSCheck extends TestCase {
|
|
|
public void testFSBlocks( String rootName ) throws Exception {
|
|
|
createInputFile(rootName);
|
|
|
runDistributedFSCheck();
|
|
|
+ cleanup(); // clean up after all to restore the system state
|
|
|
}
|
|
|
|
|
|
private void createInputFile( String rootName ) throws IOException {
|
|
|
- fs.delete(MAP_INPUT_DIR);
|
|
|
+ cleanup(); // clean up if previous run failed
|
|
|
|
|
|
Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
|
|
|
SequenceFile.Writer writer =
|
|
@@ -133,18 +134,22 @@ public class DistributedFSCheck extends TestCase {
|
|
|
long offset
|
|
|
) throws IOException {
|
|
|
// open file
|
|
|
- DataInputStream in;
|
|
|
- in = new DataInputStream(fs.open(new Path(name)));
|
|
|
+ FSDataInputStream in = null;
|
|
|
+ try {
|
|
|
+ in = fs.open(new Path(name));
|
|
|
+ } catch( IOException e ) {
|
|
|
+ return name + "@(missing)";
|
|
|
+ }
|
|
|
+ in.seek( offset );
|
|
|
long actualSize = 0;
|
|
|
try {
|
|
|
long blockSize = fs.getDefaultBlockSize();
|
|
|
- int curSize = bufferSize;
|
|
|
- for( actualSize = 0;
|
|
|
+ reporter.setStatus( "reading " + name + "@" +
|
|
|
+ offset + "/" + blockSize );
|
|
|
+ for( int curSize = bufferSize;
|
|
|
curSize == bufferSize && actualSize < blockSize;
|
|
|
actualSize += curSize) {
|
|
|
- curSize = in.read( buffer, (int)offset, Math.min(bufferSize, (int)(blockSize - actualSize)) );
|
|
|
- reporter.setStatus( "reading " + name + "@" +
|
|
|
- offset + "/" + blockSize );
|
|
|
+ curSize = in.read( buffer, 0, bufferSize );
|
|
|
}
|
|
|
} catch( IOException e ) {
|
|
|
LOG.info( "Corrupted block detected in \"" + name + "\" at " + offset );
|
|
@@ -178,7 +183,6 @@ public class DistributedFSCheck extends TestCase {
|
|
|
}
|
|
|
|
|
|
private void runDistributedFSCheck() throws Exception {
|
|
|
- fs.delete(READ_DIR);
|
|
|
JobConf job = new JobConf( fs.getConf(), DistributedFSCheck.class );
|
|
|
|
|
|
job.setInputPath(MAP_INPUT_DIR);
|
|
@@ -240,6 +244,7 @@ public class DistributedFSCheck extends TestCase {
|
|
|
long execTime = System.currentTimeMillis() - tStart;
|
|
|
|
|
|
test.analyzeResult( execTime, resFileName, viewStats );
|
|
|
+ // test.cleanup(); // clean up after all to restore the system state
|
|
|
}
|
|
|
|
|
|
private void analyzeResult( long execTime,
|
|
@@ -318,7 +323,7 @@ public class DistributedFSCheck extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void cleanup() throws Exception {
|
|
|
+ private void cleanup() throws IOException {
|
|
|
LOG.info( "Cleaning up test files" );
|
|
|
fs.delete(TEST_ROOT_DIR);
|
|
|
}
|