|
@@ -338,9 +338,97 @@ public class TestEditLogTailer {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
|
|
|
+ throws Exception {
|
|
|
+ // Time in seconds to wait for standby to catch up to edits from active
|
|
|
+ final int standbyCatchupWaitTime = 2;
|
|
|
+ // Time in seconds to wait before checking if edit logs are rolled while
|
|
|
+ // expecting no edit log roll
|
|
|
+ final int noLogRollWaitTime = 2;
|
|
|
+ // Time in seconds to wait before checking if edit logs are rolled while
|
|
|
+ // expecting edit log roll
|
|
|
+ final int logRollWaitTime = 3;
|
|
|
+
|
|
|
+ Configuration conf = getConf();
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
|
|
+ standbyCatchupWaitTime + noLogRollWaitTime + 1);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
|
|
+
|
|
|
+ MiniDFSCluster cluster = createMiniDFSCluster(conf, 2);
|
|
|
+ if (cluster == null) {
|
|
|
+ fail("failed to start mini cluster.");
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ int activeIndex = new Random().nextBoolean() ? 1 : 0;
|
|
|
+ int standbyIndex = (activeIndex == 0) ? 1 : 0;
|
|
|
+ cluster.transitionToActive(activeIndex);
|
|
|
+ NameNode active = cluster.getNameNode(activeIndex);
|
|
|
+ NameNode standby = cluster.getNameNode(standbyIndex);
|
|
|
+
|
|
|
+ long origTxId = active.getNamesystem().getFSImage().getEditLog()
|
|
|
+ .getCurSegmentTxId();
|
|
|
+ for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
|
|
|
+ NameNodeAdapter.mkdirs(active, getDirPath(i),
|
|
|
+ new PermissionStatus("test", "test",
|
|
|
+ new FsPermission((short)00755)), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ long activeTxId = active.getNamesystem().getFSImage().getEditLog()
|
|
|
+ .getLastWrittenTxId();
|
|
|
+ waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId,
|
|
|
+ standbyCatchupWaitTime);
|
|
|
+
|
|
|
+ for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
|
|
|
+ NameNodeAdapter.mkdirs(active, getDirPath(i),
|
|
|
+ new PermissionStatus("test", "test",
|
|
|
+ new FsPermission((short)00755)), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean exceptionThrown = false;
|
|
|
+ try {
|
|
|
+ checkForLogRoll(active, origTxId, noLogRollWaitTime);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ exceptionThrown = true;
|
|
|
+ }
|
|
|
+ assertTrue(exceptionThrown);
|
|
|
+
|
|
|
+ checkForLogRoll(active, origTxId, logRollWaitTime);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void waitForStandbyToCatchUpWithInProgressEdits(
|
|
|
+ final NameNode standby, final long activeTxId,
|
|
|
+ int maxWaitSec) throws Exception {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ long standbyTxId = standby.getNamesystem().getFSImage()
|
|
|
+ .getLastAppliedTxId();
|
|
|
+ return (standbyTxId >= activeTxId);
|
|
|
+ }
|
|
|
+ }, 100, maxWaitSec * 1000);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void checkForLogRoll(final NameNode active,
|
|
|
+ final long origTxId, int maxWaitSec) throws Exception {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
|
|
|
+ .getCurSegmentTxId();
|
|
|
+ return (origTxId != curSegmentTxId);
|
|
|
+ }
|
|
|
+ }, 100, maxWaitSec * 1000);
|
|
|
+ }
|
|
|
+
|
|
|
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
|
|
int nnCount) throws IOException {
|
|
|
- int basePort = 10060 + new Random().nextInt(100) * 2;
|
|
|
+ int basePort = 10060 + new Random().nextInt(1000) * 2;
|
|
|
|
|
|
// By passing in basePort, name node will have IPC port set,
|
|
|
// which is needed for enabling roll log.
|