|
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.qjournal.client;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Matchers.anyInt;
|
|
|
import static org.mockito.Matchers.anyLong;
|
|
|
import static org.mockito.Matchers.eq;
|
|
|
|
|
@@ -28,9 +30,12 @@ import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.util.List;
|
|
|
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.junit.Assert;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
|
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
|
@@ -38,8 +43,12 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.INode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.Before;
|
|
@@ -57,6 +66,8 @@ import com.google.protobuf.ByteString;
|
|
|
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
|
|
|
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
|
|
|
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
|
|
|
/**
|
|
|
* True unit tests for QuorumJournalManager
|
|
@@ -105,7 +116,7 @@ public class TestQuorumJournalManagerUnit {
|
|
|
}
|
|
|
|
|
|
private AsyncLogger mockLogger() {
|
|
|
- return Mockito.mock(AsyncLogger.class);
|
|
|
+ return mock(AsyncLogger.class);
|
|
|
}
|
|
|
|
|
|
static <V> Stubber futureReturns(V value) {
|
|
@@ -202,7 +213,53 @@ public class TestQuorumJournalManagerUnit {
|
|
|
anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
|
|
|
stm.flush();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Test(expected = IllegalArgumentException.class)
|
|
|
+ public void testSetOutputBufferCapacityTooLarge() throws Exception {
|
|
|
+ qjm.setOutputBufferCapacity(
|
|
|
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT + 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Regression test for HDFS-13977
|
|
|
+ @Test
|
|
|
+ public void testFSEditLogAutoSyncToQuorumStream() throws Exception {
|
|
|
+ // Set the buffer capacity low to make it easy to fill it
|
|
|
+ qjm.setOutputBufferCapacity(512);
|
|
|
+
|
|
|
+ // Set up mocks
|
|
|
+ NNStorage mockStorage = mock(NNStorage.class);
|
|
|
+ createLogSegment(); // sets up to the mocks for startLogSegment
|
|
|
+ for (int logIdx = 0; logIdx < 3; logIdx++) {
|
|
|
+ futureReturns(null).when(spyLoggers.get(logIdx))
|
|
|
+ .sendEdits(anyLong(), anyLong(), anyInt(), any(byte[].class));
|
|
|
+ }
|
|
|
+ PermissionStatus permStat = PermissionStatus
|
|
|
+ .createImmutable("user", "group", FsPermission.getDefault());
|
|
|
+ INode fakeInode = FSImageTestUtil.createEmptyInodeFile(1, "foo",
|
|
|
+ permStat, 1, 1, (short) 1, 1);
|
|
|
+
|
|
|
+ // Create a fake FSEditLog using this QJM
|
|
|
+ String mockQjmEdits = "qjournal://mock/";
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mockQjmEdits);
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, mockQjmEdits);
|
|
|
+ FSEditLog editLog = FSImageTestUtil.createEditLogWithJournalManager(
|
|
|
+ conf, mockStorage, URI.create(mockQjmEdits), qjm);
|
|
|
+
|
|
|
+ editLog.initJournalsForWrite();
|
|
|
+ FSImageTestUtil.startLogSegment(editLog, 1, false,
|
|
|
+ NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
|
|
+ // Write enough edit ops that the output buffer capacity should fill and
|
|
|
+ // an auto-sync should be triggered
|
|
|
+ for (int i = 0; i < 12; i++) {
|
|
|
+ editLog.logMkDir("/fake/path", fakeInode);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ Mockito.verify(spyLoggers.get(i), times(1))
|
|
|
+ .sendEdits(eq(1L), eq(1L), anyInt(), any(byte[].class));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testWriteEditsOneSlow() throws Exception {
|
|
|
EditLogOutputStream stm = createLogSegment();
|