|
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertNotNull;
|
|
|
import static org.junit.Assert.assertSame;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
@@ -47,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
|
+
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -285,21 +287,18 @@ public class TestBPOfferService {
|
|
|
int totalTestBlocks = 4000;
|
|
|
Thread addNewBlockThread = null;
|
|
|
final AtomicInteger count = new AtomicInteger(0);
|
|
|
-
|
|
|
+ DataNodeFaultInjector prevDNFaultInjector = null;
|
|
|
try {
|
|
|
waitForBothActors(bpos);
|
|
|
waitForInitialization(bpos);
|
|
|
+ prevDNFaultInjector = DataNodeFaultInjector.get();
|
|
|
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
|
|
public void blockUtilSendFullBlockReport() {
|
|
|
try {
|
|
|
- GenericTestUtils.waitFor(() -> {
|
|
|
- if(count.get() > 2000) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
- }, 100, 1000);
|
|
|
+ GenericTestUtils.waitFor(() -> count.get() > 2000,
|
|
|
+ 100, 1000);
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error("error DataNodeFaultInjector", e);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
@@ -318,45 +317,41 @@ public class TestBPOfferService {
|
|
|
count.addAndGet(1);
|
|
|
Thread.sleep(1);
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
+ LOG.error("error addNewBlockThread", e);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
addNewBlockThread.start();
|
|
|
|
|
|
// Make sure that generate blocks for DataNode and IBR not empty now.
|
|
|
- GenericTestUtils.waitFor(() -> {
|
|
|
- if(count.get() > 0) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
- }, 100, 1000);
|
|
|
+ GenericTestUtils.waitFor(() -> count.get() > 0, 100, 1000);
|
|
|
|
|
|
// Trigger re-register using DataNode Command.
|
|
|
datanodeCommands[0] = new DatanodeCommand[]{RegisterCommand.REGISTER};
|
|
|
- bpos.triggerHeartbeatForTests();
|
|
|
|
|
|
+ bpos.triggerHeartbeatForTests();
|
|
|
+ addNewBlockThread.join();
|
|
|
+ addNewBlockThread = null;
|
|
|
+ // Verify FBR/IBR count is equal to generate number.
|
|
|
try {
|
|
|
- GenericTestUtils.waitFor(() -> {
|
|
|
- if(fullBlockReportCount == totalTestBlocks ||
|
|
|
- incrBlockReportCount == totalTestBlocks) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
- }, 1000, 15000);
|
|
|
- } catch (Exception e) {}
|
|
|
+ GenericTestUtils.waitFor(() ->
|
|
|
+ (fullBlockReportCount == totalTestBlocks ||
|
|
|
+ incrBlockReportCount == totalTestBlocks), 1000, 15000);
|
|
|
+ } catch (Exception e) {
|
|
|
+ fail(String.format("Timed out wait for IBR counts FBRCount = %d,"
|
|
|
+ + " IBRCount = %d; expected = %d. Exception: %s",
|
|
|
+ fullBlockReportCount, incrBlockReportCount, totalTestBlocks,
|
|
|
+ e.getMessage()));
|
|
|
+ }
|
|
|
|
|
|
- // Verify FBR/IBR count is equal to generate number.
|
|
|
- assertTrue(fullBlockReportCount == totalTestBlocks ||
|
|
|
- incrBlockReportCount == totalTestBlocks);
|
|
|
} finally {
|
|
|
- addNewBlockThread.join();
|
|
|
+ if (addNewBlockThread != null) {
|
|
|
+ addNewBlockThread.interrupt();
|
|
|
+ }
|
|
|
bpos.stop();
|
|
|
bpos.join();
|
|
|
|
|
|
- DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
|
|
- public void blockUtilSendFullBlockReport() {}
|
|
|
- });
|
|
|
+ DataNodeFaultInjector.set(prevDNFaultInjector);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1216,4 +1211,4 @@ public class TestBPOfferService {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-}
|
|
|
+}
|