|
@@ -0,0 +1,941 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.qjournal.client;
|
|
|
+
|
|
|
+import static org.junit.Assert.*;
|
|
|
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
|
|
|
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
|
|
|
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
|
|
|
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
|
|
|
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
|
|
|
+import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
|
|
|
+
|
|
|
+import java.io.Closeable;
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.URISyntaxException;
|
|
|
+import java.net.URL;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.log4j.Level;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
+import org.mockito.stubbing.Stubber;
|
|
|
+
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.google.common.util.concurrent.MoreExecutors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Functional tests for QuorumJournalManager.
|
|
|
+ * For true unit tests, see {@link TestQuorumJournalManagerUnit}.
|
|
|
+ */
|
|
|
+public class TestQuorumJournalManager {
|
|
|
+ private static final Log LOG = LogFactory.getLog(
|
|
|
+ TestQuorumJournalManager.class);
|
|
|
+
|
|
|
+ private MiniJournalCluster cluster;
|
|
|
+ private Configuration conf;
|
|
|
+ private QuorumJournalManager qjm;
|
|
|
+ private List<AsyncLogger> spies;
|
|
|
+
|
|
|
+ static {
|
|
|
+ ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setup() throws Exception {
|
|
|
+ conf = new Configuration();
|
|
|
+ // Don't retry connections - it just slows down the tests.
|
|
|
+ conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
|
|
|
+
|
|
|
+ cluster = new MiniJournalCluster.Builder(conf)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
|
|
+
|
|
|
+ qjm.format(QJMTestUtil.FAKE_NSINFO);
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+ assertEquals(1, qjm.getLoggerSetForTests().getEpoch());
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void shutdown() throws IOException {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSingleWriter() throws Exception {
|
|
|
+ writeSegment(cluster, qjm, 1, 3, true);
|
|
|
+
|
|
|
+ // Should be finalized
|
|
|
+ checkRecovery(cluster, 1, 3);
|
|
|
+
|
|
|
+ // Start a new segment
|
|
|
+ writeSegment(cluster, qjm, 4, 1, true);
|
|
|
+
|
|
|
+ // Should be finalized
|
|
|
+ checkRecovery(cluster, 4, 4);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testFormat() throws Exception {
|
|
|
+ QuorumJournalManager qjm = new QuorumJournalManager(
|
|
|
+ conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO);
|
|
|
+ assertFalse(qjm.hasSomeData());
|
|
|
+ qjm.format(FAKE_NSINFO);
|
|
|
+ assertTrue(qjm.hasSomeData());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testReaderWhileAnotherWrites() throws Exception {
|
|
|
+
|
|
|
+ QuorumJournalManager readerQjm = createSpyingQJM();
|
|
|
+ List<EditLogInputStream> streams = Lists.newArrayList();
|
|
|
+ readerQjm.selectInputStreams(streams, 0, false);
|
|
|
+ assertEquals(0, streams.size());
|
|
|
+ writeSegment(cluster, qjm, 1, 3, true);
|
|
|
+
|
|
|
+ readerQjm.selectInputStreams(streams, 0, false);
|
|
|
+ try {
|
|
|
+ assertEquals(1, streams.size());
|
|
|
+ // Validate the actual stream contents.
|
|
|
+ EditLogInputStream stream = streams.get(0);
|
|
|
+ assertEquals(1, stream.getFirstTxId());
|
|
|
+ assertEquals(3, stream.getLastTxId());
|
|
|
+
|
|
|
+ verifyEdits(streams, 1, 3);
|
|
|
+ assertNull(stream.readOp());
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
|
|
+ streams.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Ensure correct results when there is a stream in-progress, but we don't
|
|
|
+ // ask for in-progress.
|
|
|
+ writeSegment(cluster, qjm, 4, 3, false);
|
|
|
+ readerQjm.selectInputStreams(streams, 0, false);
|
|
|
+ try {
|
|
|
+ assertEquals(1, streams.size());
|
|
|
+ EditLogInputStream stream = streams.get(0);
|
|
|
+ assertEquals(1, stream.getFirstTxId());
|
|
|
+ assertEquals(3, stream.getLastTxId());
|
|
|
+ verifyEdits(streams, 1, 3);
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
|
|
+ streams.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO: check results for selectInputStreams with inProgressOK = true.
|
|
|
+ // This doesn't currently work, due to a bug where RedundantEditInputStream
|
|
|
+ // throws an exception if there are any unvalidated in-progress edits in the list!
|
|
|
+ // But, it shouldn't be necessary for current use cases.
|
|
|
+
|
|
|
+ qjm.finalizeLogSegment(4, 6);
|
|
|
+ readerQjm.selectInputStreams(streams, 0, false);
|
|
|
+ try {
|
|
|
+ assertEquals(2, streams.size());
|
|
|
+ assertEquals(4, streams.get(1).getFirstTxId());
|
|
|
+ assertEquals(6, streams.get(1).getLastTxId());
|
|
|
+
|
|
|
+ verifyEdits(streams, 1, 6);
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
|
|
+ streams.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Regression test for HDFS-3725. One of the journal nodes is down
|
|
|
+ * during the writing of one segment, then comes back up later to
|
|
|
+ * take part in a later segment. Thus, its local edits are
|
|
|
+ * not a contiguous sequence. This should be handled correctly.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testOneJNMissingSegments() throws Exception {
|
|
|
+ writeSegment(cluster, qjm, 1, 3, true);
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+ cluster.getJournalNode(0).stopAndJoin(0);
|
|
|
+ writeSegment(cluster, qjm, 4, 3, true);
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+ cluster.restartJournalNode(0);
|
|
|
+ writeSegment(cluster, qjm, 7, 3, true);
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+ cluster.getJournalNode(1).stopAndJoin(0);
|
|
|
+
|
|
|
+ QuorumJournalManager readerQjm = createSpyingQJM();
|
|
|
+ List<EditLogInputStream> streams = Lists.newArrayList();
|
|
|
+ try {
|
|
|
+ readerQjm.selectInputStreams(streams, 1, false);
|
|
|
+ verifyEdits(streams, 1, 9);
|
|
|
+ } finally {
|
|
|
+ IOUtils.cleanup(LOG, streams.toArray(new Closeable[0]));
|
|
|
+ readerQjm.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Regression test for HDFS-3891: selectInputStreams should throw
|
|
|
+ * an exception when a majority of journalnodes have crashed.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSelectInputStreamsMajorityDown() throws Exception {
|
|
|
+ // Shut down all of the JNs.
|
|
|
+ cluster.shutdown();
|
|
|
+
|
|
|
+ List<EditLogInputStream> streams = Lists.newArrayList();
|
|
|
+ try {
|
|
|
+ qjm.selectInputStreams(streams, 0, false);
|
|
|
+ fail("Did not throw IOE");
|
|
|
+ } catch (QuorumException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains(
|
|
|
+ "Got too many exceptions", ioe);
|
|
|
+ assertTrue(streams.isEmpty());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the case where the NN crashes after starting a new segment
|
|
|
+ * on all nodes, but before writing the first transaction to it.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testCrashAtBeginningOfSegment() throws Exception {
|
|
|
+ writeSegment(cluster, qjm, 1, 3, true);
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+
|
|
|
+ EditLogOutputStream stm = qjm.startLogSegment(4);
|
|
|
+ try {
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+ } finally {
|
|
|
+ stm.abort();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // Make a new QJM
|
|
|
+ qjm = new QuorumJournalManager(
|
|
|
+ conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+ checkRecovery(cluster, 1, 3);
|
|
|
+
|
|
|
+ writeSegment(cluster, qjm, 4, 3, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testOutOfSyncAtBeginningOfSegment0() throws Exception {
|
|
|
+ doTestOutOfSyncAtBeginningOfSegment(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testOutOfSyncAtBeginningOfSegment1() throws Exception {
|
|
|
+ doTestOutOfSyncAtBeginningOfSegment(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testOutOfSyncAtBeginningOfSegment2() throws Exception {
|
|
|
+ doTestOutOfSyncAtBeginningOfSegment(2);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the case where, at the beginning of a segment, transactions
|
|
|
+ * have been written to one JN but not others.
|
|
|
+ */
|
|
|
+ public void doTestOutOfSyncAtBeginningOfSegment(int nodeWithOneTxn)
|
|
|
+ throws Exception {
|
|
|
+
|
|
|
+ int nodeWithEmptySegment = (nodeWithOneTxn + 1) % 3;
|
|
|
+ int nodeMissingSegment = (nodeWithOneTxn + 2) % 3;
|
|
|
+
|
|
|
+ writeSegment(cluster, qjm, 1, 3, true);
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+ cluster.getJournalNode(nodeMissingSegment).stopAndJoin(0);
|
|
|
+
|
|
|
+ // Open segment on 2/3 nodes
|
|
|
+ EditLogOutputStream stm = qjm.startLogSegment(4);
|
|
|
+ try {
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+
|
|
|
+ // Write transactions to only 1/3 nodes
|
|
|
+ failLoggerAtTxn(spies.get(nodeWithEmptySegment), 4);
|
|
|
+ try {
|
|
|
+ writeTxns(stm, 4, 1);
|
|
|
+ fail("Did not fail even though 2/3 failed");
|
|
|
+ } catch (QuorumException qe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("mock failure", qe);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ stm.abort();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Bring back the down JN.
|
|
|
+ cluster.restartJournalNode(nodeMissingSegment);
|
|
|
+
|
|
|
+ // Make a new QJM. At this point, the state is as follows:
|
|
|
+ // A: nodeWithEmptySegment: 1-3 finalized, 4_inprogress (empty)
|
|
|
+ // B: nodeWithOneTxn: 1-3 finalized, 4_inprogress (1 txn)
|
|
|
+ // C: nodeMissingSegment: 1-3 finalized
|
|
|
+ GenericTestUtils.assertGlobEquals(
|
|
|
+ cluster.getCurrentDir(nodeWithEmptySegment, JID),
|
|
|
+ "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(1, 3),
|
|
|
+ NNStorage.getInProgressEditsFileName(4));
|
|
|
+ GenericTestUtils.assertGlobEquals(
|
|
|
+ cluster.getCurrentDir(nodeWithOneTxn, JID),
|
|
|
+ "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(1, 3),
|
|
|
+ NNStorage.getInProgressEditsFileName(4));
|
|
|
+ GenericTestUtils.assertGlobEquals(
|
|
|
+ cluster.getCurrentDir(nodeMissingSegment, JID),
|
|
|
+ "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(1, 3));
|
|
|
+
|
|
|
+
|
|
|
+ // Stop one of the nodes. Since we run this test three
|
|
|
+ // times, rotating the roles of the nodes, we'll test
|
|
|
+ // all the permutations.
|
|
|
+ cluster.getJournalNode(2).stopAndJoin(0);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+
|
|
|
+ if (nodeWithOneTxn == 0 ||
|
|
|
+ nodeWithOneTxn == 1) {
|
|
|
+ // If the node that had the transaction committed was one of the nodes
|
|
|
+ // that responded during recovery, then we should have recovered txid
|
|
|
+ // 4.
|
|
|
+ checkRecovery(cluster, 4, 4);
|
|
|
+ writeSegment(cluster, qjm, 5, 3, true);
|
|
|
+ } else {
|
|
|
+ // Otherwise, we should have recovered only 1-3 and should be able to
|
|
|
+ // start a segment at 4.
|
|
|
+ checkRecovery(cluster, 1, 3);
|
|
|
+ writeSegment(cluster, qjm, 4, 3, true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case where a new writer picks up from an old one with no failures
|
|
|
+ * and the previous unfinalized segment entirely consistent -- i.e. all
|
|
|
+ * the JournalNodes end at the same transaction ID.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testChangeWritersLogsInSync() throws Exception {
|
|
|
+ writeSegment(cluster, qjm, 1, 3, false);
|
|
|
+ QJMTestUtil.assertExistsInQuorum(cluster,
|
|
|
+ NNStorage.getInProgressEditsFileName(1));
|
|
|
+
|
|
|
+ // Make a new QJM
|
|
|
+ qjm = new QuorumJournalManager(
|
|
|
+ conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+ checkRecovery(cluster, 1, 3);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case where a new writer picks up from an old one which crashed
|
|
|
+ * with the three loggers at different txnids
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testChangeWritersLogsOutOfSync1() throws Exception {
|
|
|
+ // Journal states: [3, 4, 5]
|
|
|
+ // During recovery: [x, 4, 5]
|
|
|
+ // Should recovery to txn 5
|
|
|
+ doOutOfSyncTest(0, 5L);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testChangeWritersLogsOutOfSync2() throws Exception {
|
|
|
+ // Journal states: [3, 4, 5]
|
|
|
+ // During recovery: [3, x, 5]
|
|
|
+ // Should recovery to txn 5
|
|
|
+ doOutOfSyncTest(1, 5L);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testChangeWritersLogsOutOfSync3() throws Exception {
|
|
|
+ // Journal states: [3, 4, 5]
|
|
|
+ // During recovery: [3, 4, x]
|
|
|
+ // Should recovery to txn 4
|
|
|
+ doOutOfSyncTest(2, 4L);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void doOutOfSyncTest(int missingOnRecoveryIdx,
|
|
|
+ long expectedRecoveryTxnId) throws Exception {
|
|
|
+ setupLoggers345();
|
|
|
+
|
|
|
+ QJMTestUtil.assertExistsInQuorum(cluster,
|
|
|
+ NNStorage.getInProgressEditsFileName(1));
|
|
|
+
|
|
|
+ // Shut down the specified JN, so it's not present during recovery.
|
|
|
+ cluster.getJournalNode(missingOnRecoveryIdx).stopAndJoin(0);
|
|
|
+
|
|
|
+ // Make a new QJM
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+ checkRecovery(cluster, 1, expectedRecoveryTxnId);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void failLoggerAtTxn(AsyncLogger spy, long txid) {
|
|
|
+ TestQuorumJournalManagerUnit.futureThrows(new IOException("mock failure"))
|
|
|
+ .when(spy).sendEdits(Mockito.anyLong(),
|
|
|
+ Mockito.eq(txid), Mockito.eq(1), Mockito.<byte[]>any());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the case where one of the loggers misses a finalizeLogSegment()
|
|
|
+ * call, and then misses the next startLogSegment() call before coming
|
|
|
+ * back to life.
|
|
|
+ *
|
|
|
+ * Previously, this caused it to keep on writing to the old log segment,
|
|
|
+ * such that one logger had eg edits_1-10 while the others had edits_1-5 and
|
|
|
+ * edits_6-10. This caused recovery to fail in certain cases.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testMissFinalizeAndNextStart() throws Exception {
|
|
|
+
|
|
|
+ // Logger 0: miss finalize(1-3) and start(4)
|
|
|
+ futureThrows(new IOException("injected")).when(spies.get(0))
|
|
|
+ .finalizeLogSegment(Mockito.eq(1L), Mockito.eq(3L));
|
|
|
+ futureThrows(new IOException("injected")).when(spies.get(0))
|
|
|
+ .startLogSegment(Mockito.eq(4L));
|
|
|
+
|
|
|
+ // Logger 1: fail at txn id 4
|
|
|
+ failLoggerAtTxn(spies.get(1), 4L);
|
|
|
+
|
|
|
+ writeSegment(cluster, qjm, 1, 3, true);
|
|
|
+ EditLogOutputStream stm = qjm.startLogSegment(4);
|
|
|
+ try {
|
|
|
+ writeTxns(stm, 4, 1);
|
|
|
+ fail("Did not fail to write");
|
|
|
+ } catch (QuorumException qe) {
|
|
|
+ // Should fail, because logger 1 had an injected fault and
|
|
|
+ // logger 0 should detect writer out of sync
|
|
|
+ GenericTestUtils.assertExceptionContains("Writer out of sync",
|
|
|
+ qe);
|
|
|
+ } finally {
|
|
|
+ stm.abort();
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // State:
|
|
|
+ // Logger 0: 1-3 in-progress (since it missed finalize)
|
|
|
+ // Logger 1: 1-3 finalized
|
|
|
+ // Logger 2: 1-3 finalized, 4 in-progress with one txn
|
|
|
+
|
|
|
+ // Shut down logger 2 so it doesn't participate in recovery
|
|
|
+ cluster.getJournalNode(2).stopAndJoin(0);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
|
|
|
+ assertEquals(3L, recovered);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * edit lengths [3,4,5]
|
|
|
+ * first recovery:
|
|
|
+ * - sees [3,4,x]
|
|
|
+ * - picks length 4 for recoveryEndTxId
|
|
|
+ * - calls acceptRecovery()
|
|
|
+ * - crashes before finalizing
|
|
|
+ * second recovery:
|
|
|
+ * - sees [x, 4, 5]
|
|
|
+ * - should pick recovery length 4, even though it saw
|
|
|
+ * a larger txid, because a previous recovery accepted it
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRecoverAfterIncompleteRecovery() throws Exception {
|
|
|
+ setupLoggers345();
|
|
|
+
|
|
|
+ // Shut down the logger that has length = 5
|
|
|
+ cluster.getJournalNode(2).stopAndJoin(0);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
|
|
+
|
|
|
+ // Allow no logger to finalize
|
|
|
+ for (AsyncLogger spy : spies) {
|
|
|
+ TestQuorumJournalManagerUnit.futureThrows(new IOException("injected"))
|
|
|
+ .when(spy).finalizeLogSegment(Mockito.eq(1L),
|
|
|
+ Mockito.eq(4L));
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+ fail("Should have failed recovery since no finalization occurred");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("injected", ioe);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now bring back the logger that had 5, and run recovery again.
|
|
|
+ // We should recover to 4, even though there's a longer log.
|
|
|
+ cluster.getJournalNode(0).stopAndJoin(0);
|
|
|
+ cluster.restartJournalNode(2);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+ checkRecovery(cluster, 1, 4);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set up the loggers into the following state:
|
|
|
+ * - JN0: edits 1-3 in progress
|
|
|
+ * - JN1: edits 1-4 in progress
|
|
|
+ * - JN2: edits 1-5 in progress
|
|
|
+ *
|
|
|
+ * None of the loggers have any associated paxos info.
|
|
|
+ */
|
|
|
+ private void setupLoggers345() throws Exception {
|
|
|
+ EditLogOutputStream stm = qjm.startLogSegment(1);
|
|
|
+
|
|
|
+ failLoggerAtTxn(spies.get(0), 4);
|
|
|
+ failLoggerAtTxn(spies.get(1), 5);
|
|
|
+
|
|
|
+ writeTxns(stm, 1, 3);
|
|
|
+
|
|
|
+ // This should succeed to 2/3 loggers
|
|
|
+ writeTxns(stm, 4, 1);
|
|
|
+
|
|
|
+ // This should only succeed to 1 logger (index 2). Hence it should
|
|
|
+ // fail
|
|
|
+ try {
|
|
|
+ writeTxns(stm, 5, 1);
|
|
|
+ fail("Did not fail to write when only a minority succeeded");
|
|
|
+ } catch (QuorumException qe) {
|
|
|
+ GenericTestUtils.assertExceptionContains(
|
|
|
+ "too many exceptions to achieve quorum size 2/3",
|
|
|
+ qe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set up the following tricky edge case state which is used by
|
|
|
+ * multiple tests:
|
|
|
+ *
|
|
|
+ * Initial writer:
|
|
|
+ * - Writing to 3 JNs: JN0, JN1, JN2:
|
|
|
+ * - A log segment with txnid 1 through 100 succeeds.
|
|
|
+ * - The first transaction in the next segment only goes to JN0
|
|
|
+ * before the writer crashes (eg it is partitioned)
|
|
|
+ *
|
|
|
+ * Recovery by another writer:
|
|
|
+ * - The new NN starts recovery and talks to all three. Thus, it sees
|
|
|
+ * that the newest log segment which needs recovery is 101.
|
|
|
+ * - It sends the prepareRecovery(101) call, and decides that the
|
|
|
+ * recovery length for 101 is only the 1 transaction.
|
|
|
+ * - It sends acceptRecovery(101-101) to only JN0, before crashing
|
|
|
+ *
|
|
|
+ * This yields the following state:
|
|
|
+ * - JN0: 1-100 finalized, 101_inprogress, accepted recovery: 101-101
|
|
|
+ * - JN1: 1-100 finalized, 101_inprogress.empty
|
|
|
+ * - JN2: 1-100 finalized, 101_inprogress.empty
|
|
|
+ * (the .empty files got moved aside during recovery)
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ private void setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery() throws Exception {
|
|
|
+ // Log segment with txns 1-100 succeeds
|
|
|
+ writeSegment(cluster, qjm, 1, 100, true);
|
|
|
+
|
|
|
+ // startLogSegment only makes it to one of the three nodes
|
|
|
+ failLoggerAtTxn(spies.get(1), 101);
|
|
|
+ failLoggerAtTxn(spies.get(2), 101);
|
|
|
+
|
|
|
+ try {
|
|
|
+ writeSegment(cluster, qjm, 101, 1, true);
|
|
|
+ fail("Should have failed");
|
|
|
+ } catch (QuorumException qe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("mock failure", qe);
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Recovery 1:
|
|
|
+ // make acceptRecovery() only make it to the node which has txid 101
|
|
|
+ // this should fail because only 1/3 accepted the recovery
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
|
|
+ futureThrows(new IOException("mock failure")).when(spies.get(1))
|
|
|
+ .acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
|
|
|
+ futureThrows(new IOException("mock failure")).when(spies.get(2))
|
|
|
+ .acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
|
|
|
+
|
|
|
+ try {
|
|
|
+ qjm.recoverUnfinalizedSegments();
|
|
|
+ fail("Should have failed to recover");
|
|
|
+ } catch (QuorumException qe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("mock failure", qe);
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check that we have entered the expected state as described in the
|
|
|
+ // method javadoc.
|
|
|
+ GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(0, JID),
|
|
|
+ "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(1, 100),
|
|
|
+ NNStorage.getInProgressEditsFileName(101));
|
|
|
+ GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(1, JID),
|
|
|
+ "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(1, 100),
|
|
|
+ NNStorage.getInProgressEditsFileName(101) + ".empty");
|
|
|
+ GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(2, JID),
|
|
|
+ "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(1, 100),
|
|
|
+ NNStorage.getInProgressEditsFileName(101) + ".empty");
|
|
|
+
|
|
|
+ File paxos0 = new File(cluster.getCurrentDir(0, JID), "paxos");
|
|
|
+ File paxos1 = new File(cluster.getCurrentDir(1, JID), "paxos");
|
|
|
+ File paxos2 = new File(cluster.getCurrentDir(2, JID), "paxos");
|
|
|
+
|
|
|
+ GenericTestUtils.assertGlobEquals(paxos0, ".*", "101");
|
|
|
+ GenericTestUtils.assertGlobEquals(paxos1, ".*");
|
|
|
+ GenericTestUtils.assertGlobEquals(paxos2, ".*");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test an edge case discovered by randomized testing.
|
|
|
+ *
|
|
|
+ * Starts with the edge case state set up by
|
|
|
+ * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
|
|
|
+ *
|
|
|
+ * Recovery 2:
|
|
|
+ * - New NN starts recovery and only talks to JN1 and JN2. JN0 has
|
|
|
+ * crashed. Since they have no logs open, they say they don't need
|
|
|
+ * recovery.
|
|
|
+ * - Starts writing segment 101, and writes 50 transactions before crashing.
|
|
|
+ *
|
|
|
+ * Recovery 3:
|
|
|
+ * - JN0 has come back to life.
|
|
|
+ * - New NN starts recovery and talks to all three. All three have
|
|
|
+ * segments open from txid 101, so it calls prepareRecovery(101)
|
|
|
+ * - JN0 has an already-accepted value for segment 101, so it replies
|
|
|
+ * "you should recover 101-101"
|
|
|
+ * - Former incorrect behavior: NN truncates logs to txid 101 even though
|
|
|
+ * it should have recovered through 150.
|
|
|
+ *
|
|
|
+ * In this case, even though there is an accepted recovery decision,
|
|
|
+ * the newer log segments should take precedence, since they were written
|
|
|
+ * in a newer epoch than the recorded decision.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testNewerVersionOfSegmentWins() throws Exception {
|
|
|
+ setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
|
|
|
+
|
|
|
+ // Now start writing again without JN0 present:
|
|
|
+ cluster.getJournalNode(0).stopAndJoin(0);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ try {
|
|
|
+ assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
|
|
+
|
|
|
+ // Write segment but do not finalize
|
|
|
+ writeSegment(cluster, qjm, 101, 50, false);
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now try to recover a new writer, with JN0 present,
|
|
|
+ // and ensure that all of the above-written transactions are recovered.
|
|
|
+ cluster.restartJournalNode(0);
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ try {
|
|
|
+ assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test another edge case discovered by randomized testing.
|
|
|
+ *
|
|
|
+ * Starts with the edge case state set up by
|
|
|
+ * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
|
|
|
+ *
|
|
|
+ * Recovery 2:
|
|
|
+ * - New NN starts recovery and only talks to JN1 and JN2. JN0 has
|
|
|
+ * crashed. Since they have no logs open, they say they don't need
|
|
|
+ * recovery.
|
|
|
+ * - Before writing any transactions, JN0 comes back to life and
|
|
|
+ * JN1 crashes.
|
|
|
+ * - Starts writing segment 101, and writes 50 transactions before crashing.
|
|
|
+ *
|
|
|
+ * Recovery 3:
|
|
|
+ * - JN1 has come back to life. JN2 crashes.
|
|
|
+ * - New NN starts recovery and talks to all three. All three have
|
|
|
+ * segments open from txid 101, so it calls prepareRecovery(101)
|
|
|
+ * - JN0 has an already-accepted value for segment 101, so it replies
|
|
|
+ * "you should recover 101-101"
|
|
|
+ * - Former incorrect behavior: NN truncates logs to txid 101 even though
|
|
|
+ * it should have recovered through 150.
|
|
|
+ *
|
|
|
+ * In this case, even though there is an accepted recovery decision,
|
|
|
+ * the newer log segments should take precedence, since they were written
|
|
|
+ * in a newer epoch than the recorded decision.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testNewerVersionOfSegmentWins2() throws Exception {
|
|
|
+ setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
|
|
|
+
|
|
|
+ // Recover without JN0 present.
|
|
|
+ cluster.getJournalNode(0).stopAndJoin(0);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ try {
|
|
|
+ assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
|
|
+
|
|
|
+ // After recovery, JN0 comes back to life and JN1 crashes.
|
|
|
+ cluster.restartJournalNode(0);
|
|
|
+ cluster.getJournalNode(1).stopAndJoin(0);
|
|
|
+
|
|
|
+ // Write segment but do not finalize
|
|
|
+ writeSegment(cluster, qjm, 101, 50, false);
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // State:
|
|
|
+ // JN0: 1-100 finalized, 101_inprogress (txns up to 150)
|
|
|
+ // Previously, JN0 had an accepted recovery 101-101 from an earlier recovery
|
|
|
+ // attempt.
|
|
|
+ // JN1: 1-100 finalized
|
|
|
+ // JN2: 1-100 finalized, 101_inprogress (txns up to 150)
|
|
|
+
|
|
|
+ // We need to test that the accepted recovery 101-101 on JN0 doesn't
|
|
|
+ // end up truncating the log back to 101.
|
|
|
+
|
|
|
+ cluster.restartJournalNode(1);
|
|
|
+ cluster.getJournalNode(2).stopAndJoin(0);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ try {
|
|
|
+ assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=20000)
|
|
|
+ public void testCrashBetweenSyncLogAndPersistPaxosData() throws Exception {
|
|
|
+ JournalFaultInjector faultInjector =
|
|
|
+ JournalFaultInjector.instance = Mockito.mock(JournalFaultInjector.class);
|
|
|
+
|
|
|
+ setupLoggers345();
|
|
|
+
|
|
|
+ // Run recovery where the client only talks to JN0, JN1, such that it
|
|
|
+ // decides that the correct length is through txid 4.
|
|
|
+ // Only allow it to call acceptRecovery() on JN0.
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
|
|
+ cluster.getJournalNode(2).stopAndJoin(0);
|
|
|
+ injectIOE().when(spies.get(1)).acceptRecovery(
|
|
|
+ Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
|
|
|
+
|
|
|
+ tryRecoveryExpectingFailure();
|
|
|
+
|
|
|
+ cluster.restartJournalNode(2);
|
|
|
+
|
|
|
+ // State at this point:
|
|
|
+ // JN0: edit log for 1-4, paxos recovery data for txid 4
|
|
|
+ // JN1: edit log for 1-4,
|
|
|
+ // JN2: edit log for 1-5
|
|
|
+
|
|
|
+ // Run recovery again, but don't allow JN0 to respond to the
|
|
|
+ // prepareRecovery() call. This will cause recovery to decide
|
|
|
+ // on txid 5.
|
|
|
+ // Additionally, crash all of the nodes before they persist
|
|
|
+ // any new paxos data.
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ spies = qjm.getLoggerSetForTests().getLoggersForTests();
|
|
|
+ injectIOE().when(spies.get(0)).prepareRecovery(Mockito.eq(1L));
|
|
|
+
|
|
|
+ Mockito.doThrow(new IOException("Injected")).when(faultInjector)
|
|
|
+ .beforePersistPaxosData();
|
|
|
+ tryRecoveryExpectingFailure();
|
|
|
+ Mockito.reset(faultInjector);
|
|
|
+
|
|
|
+ // State at this point:
|
|
|
+ // JN0: edit log for 1-5, paxos recovery data for txid 4
|
|
|
+ // !!! This is the interesting bit, above. The on-disk data and the
|
|
|
+ // paxos data don't match up!
|
|
|
+ // JN1: edit log for 1-5,
|
|
|
+ // JN2: edit log for 1-5,
|
|
|
+
|
|
|
+ // Now, stop JN2, and see if we can still start up even though
|
|
|
+ // JN0 is in a strange state where its log data is actually newer
|
|
|
+ // than its accepted Paxos state.
|
|
|
+
|
|
|
+ cluster.getJournalNode(2).stopAndJoin(0);
|
|
|
+
|
|
|
+ qjm = createSpyingQJM();
|
|
|
+ try {
|
|
|
+ long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
|
|
|
+ assertTrue(recovered >= 4); // 4 was committed to a quorum
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void tryRecoveryExpectingFailure() throws IOException {
|
|
|
+ try {
|
|
|
+ QJMTestUtil.recoverAndReturnLastTxn(qjm);
|
|
|
+ fail("Expected to fail recovery");
|
|
|
+ } catch (QuorumException qe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("Injected", qe);
|
|
|
+ } finally {
|
|
|
+ qjm.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private Stubber injectIOE() {
|
|
|
+ return futureThrows(new IOException("Injected"));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPurgeLogs() throws Exception {
|
|
|
+ for (int txid = 1; txid <= 5; txid++) {
|
|
|
+ writeSegment(cluster, qjm, txid, 1, true);
|
|
|
+ }
|
|
|
+ File curDir = cluster.getCurrentDir(0, JID);
|
|
|
+ GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(1, 1),
|
|
|
+ NNStorage.getFinalizedEditsFileName(2, 2),
|
|
|
+ NNStorage.getFinalizedEditsFileName(3, 3),
|
|
|
+ NNStorage.getFinalizedEditsFileName(4, 4),
|
|
|
+ NNStorage.getFinalizedEditsFileName(5, 5));
|
|
|
+ File paxosDir = new File(curDir, "paxos");
|
|
|
+ GenericTestUtils.assertExists(paxosDir);
|
|
|
+
|
|
|
+ // Create new files in the paxos directory, which should get purged too.
|
|
|
+ assertTrue(new File(paxosDir, "1").createNewFile());
|
|
|
+ assertTrue(new File(paxosDir, "3").createNewFile());
|
|
|
+
|
|
|
+ GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
|
|
|
+ "1", "3");
|
|
|
+
|
|
|
+ // Create some temporary files of the sort that are used during recovery.
|
|
|
+ assertTrue(new File(curDir,
|
|
|
+ "edits_inprogress_0000000000000000001.epoch=140").createNewFile());
|
|
|
+ assertTrue(new File(curDir,
|
|
|
+ "edits_inprogress_0000000000000000002.empty").createNewFile());
|
|
|
+
|
|
|
+ qjm.purgeLogsOlderThan(3);
|
|
|
+
|
|
|
+ // Log purging is asynchronous, so we have to wait for the calls
|
|
|
+ // to be sent and respond before verifying.
|
|
|
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
|
|
|
+
|
|
|
+ // Older edits should be purged
|
|
|
+ GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
|
|
|
+ NNStorage.getFinalizedEditsFileName(3, 3),
|
|
|
+ NNStorage.getFinalizedEditsFileName(4, 4),
|
|
|
+ NNStorage.getFinalizedEditsFileName(5, 5));
|
|
|
+
|
|
|
+ // Older paxos files should be purged
|
|
|
+ GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
|
|
|
+ "3");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testToString() throws Exception {
|
|
|
+ GenericTestUtils.assertMatches(
|
|
|
+ qjm.toString(),
|
|
|
+ "QJM to \\[127.0.0.1:\\d+, 127.0.0.1:\\d+, 127.0.0.1:\\d+\\]");
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private QuorumJournalManager createSpyingQJM()
|
|
|
+ throws IOException, URISyntaxException {
|
|
|
+ AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
|
|
|
+ @Override
|
|
|
+ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
|
|
|
+ String journalId, InetSocketAddress addr) {
|
|
|
+ AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
|
|
|
+ protected ExecutorService createExecutor() {
|
|
|
+ // Don't parallelize calls to the quorum in the tests.
|
|
|
+ // This makes the tests more deterministic.
|
|
|
+ return MoreExecutors.sameThreadExecutor();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ return Mockito.spy(logger);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ return new QuorumJournalManager(
|
|
|
+ conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void waitForAllPendingCalls(AsyncLoggerSet als)
|
|
|
+ throws InterruptedException {
|
|
|
+ for (AsyncLogger l : als.getLoggersForTests()) {
|
|
|
+ IPCLoggerChannel ch = (IPCLoggerChannel)l;
|
|
|
+ ch.waitForAllPendingCalls();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkRecovery(MiniJournalCluster cluster,
|
|
|
+ long segmentTxId, long expectedEndTxId)
|
|
|
+ throws IOException {
|
|
|
+ int numFinalized = 0;
|
|
|
+ for (int i = 0; i < cluster.getNumNodes(); i++) {
|
|
|
+ File logDir = cluster.getCurrentDir(i, JID);
|
|
|
+ EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId);
|
|
|
+ if (elf == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!elf.isInProgress()) {
|
|
|
+ numFinalized++;
|
|
|
+ if (elf.getLastTxId() != expectedEndTxId) {
|
|
|
+ fail("File " + elf + " finalized to wrong txid, expected " +
|
|
|
+ expectedEndTxId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (numFinalized < cluster.getQuorumSize()) {
|
|
|
+ fail("Did not find a quorum of finalized logs starting at " +
|
|
|
+ segmentTxId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|