|
@@ -63,7 +63,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
|
|
|
|
@BeforeEach
|
|
@BeforeEach
|
|
public void setUp() throws Exception {
|
|
public void setUp() throws Exception {
|
|
- processedRequests = new LinkedBlockingQueue<Request>();
|
|
|
|
|
|
+ processedRequests = new LinkedBlockingQueue<>();
|
|
processor = new MockCommitProcessor();
|
|
processor = new MockCommitProcessor();
|
|
CommitProcessor.setMaxReadBatchSize(-1);
|
|
CommitProcessor.setMaxReadBatchSize(-1);
|
|
CommitProcessor.setMaxCommitBatchSize(1);
|
|
CommitProcessor.setMaxCommitBatchSize(1);
|
|
@@ -174,8 +174,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
@Test
|
|
@Test
|
|
public void processAsMuchUncommittedRequestsAsPossibleTest() throws Exception {
|
|
public void processAsMuchUncommittedRequestsAsPossibleTest() throws Exception {
|
|
final String path = "/testAsMuchAsPossible";
|
|
final String path = "/testAsMuchAsPossible";
|
|
- List<Request> shouldBeProcessed = new LinkedList<Request>();
|
|
|
|
- Set<Request> shouldNotBeProcessed = new HashSet<Request>();
|
|
|
|
|
|
+ List<Request> shouldBeProcessed = new LinkedList<>();
|
|
|
|
+ Set<Request> shouldNotBeProcessed = new HashSet<>();
|
|
for (int sessionId = 1; sessionId <= 5; ++sessionId) {
|
|
for (int sessionId = 1; sessionId <= 5; ++sessionId) {
|
|
for (int readReqId = 1; readReqId <= sessionId; ++readReqId) {
|
|
for (int readReqId = 1; readReqId <= sessionId; ++readReqId) {
|
|
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionId, readReqId);
|
|
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionId, readReqId);
|
|
@@ -225,8 +225,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
@Test
|
|
@Test
|
|
public void processAllFollowingUncommittedAfterFirstCommitTest() throws Exception {
|
|
public void processAllFollowingUncommittedAfterFirstCommitTest() throws Exception {
|
|
final String path = "/testUncommittedFollowingCommited";
|
|
final String path = "/testUncommittedFollowingCommited";
|
|
- Set<Request> shouldBeInPending = new HashSet<Request>();
|
|
|
|
- Set<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
|
|
|
|
|
|
+ Set<Request> shouldBeInPending = new HashSet<>();
|
|
|
|
+ Set<Request> shouldBeProcessedAfterPending = new HashSet<>();
|
|
|
|
|
|
Request writeReq = newRequest(
|
|
Request writeReq = newRequest(
|
|
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
@@ -287,7 +287,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
@Test
|
|
@Test
|
|
public void processAllWritesMaxBatchSize() throws Exception {
|
|
public void processAllWritesMaxBatchSize() throws Exception {
|
|
final String path = "/processAllWritesMaxBatchSize";
|
|
final String path = "/processAllWritesMaxBatchSize";
|
|
- HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
|
|
|
|
|
|
+ HashSet<Request> shouldBeProcessedAfterPending = new HashSet<>();
|
|
|
|
|
|
Request writeReq = newRequest(
|
|
Request writeReq = newRequest(
|
|
new CreateRequest(
|
|
new CreateRequest(
|
|
@@ -423,7 +423,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception {
|
|
public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception {
|
|
final String path = "/noStarvationOfCommittedRequests";
|
|
final String path = "/noStarvationOfCommittedRequests";
|
|
processor.queuedRequests = new MockRequestsQueue();
|
|
processor.queuedRequests = new MockRequestsQueue();
|
|
- Set<Request> nonLocalCommits = new HashSet<Request>();
|
|
|
|
|
|
+ Set<Request> nonLocalCommits = new HashSet<>();
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int i = 0; i < 10; i++) {
|
|
Request nonLocalCommitReq = newRequest(
|
|
Request nonLocalCommitReq = newRequest(
|
|
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
@@ -461,7 +461,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
processor.queuedRequests.add(firstCommittedReq);
|
|
processor.queuedRequests.add(firstCommittedReq);
|
|
processor.queuedWriteRequests.add(firstCommittedReq);
|
|
processor.queuedWriteRequests.add(firstCommittedReq);
|
|
processor.committedRequests.add(firstCommittedReq);
|
|
processor.committedRequests.add(firstCommittedReq);
|
|
- Set<Request> allReads = new HashSet<Request>();
|
|
|
|
|
|
+ Set<Request> allReads = new HashSet<>();
|
|
|
|
|
|
// +1 read request to queuedRequests
|
|
// +1 read request to queuedRequests
|
|
Request firstRead = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 0);
|
|
Request firstRead = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 0);
|
|
@@ -476,7 +476,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
2);
|
|
2);
|
|
processor.committedRequests.add(secondCommittedReq);
|
|
processor.committedRequests.add(secondCommittedReq);
|
|
|
|
|
|
- Set<Request> waitingCommittedRequests = new HashSet<Request>();
|
|
|
|
|
|
+ Set<Request> waitingCommittedRequests = new HashSet<>();
|
|
// +99 non local committed requests
|
|
// +99 non local committed requests
|
|
for (int writeReqId = 3; writeReqId < 102; ++writeReqId) {
|
|
for (int writeReqId = 3; writeReqId < 102; ++writeReqId) {
|
|
Request writeReq = newRequest(
|
|
Request writeReq = newRequest(
|
|
@@ -530,7 +530,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
final int firstCXid = 0x100;
|
|
final int firstCXid = 0x100;
|
|
int readReqId = firstCXid;
|
|
int readReqId = firstCXid;
|
|
processor.stoppedMainLoop = true;
|
|
processor.stoppedMainLoop = true;
|
|
- HashSet<Request> localRequests = new HashSet<Request>();
|
|
|
|
|
|
+ HashSet<Request> localRequests = new HashSet<>();
|
|
// queue the blocking write request to queuedRequests
|
|
// queue the blocking write request to queuedRequests
|
|
Request firstCommittedReq = newRequest(
|
|
Request firstCommittedReq = newRequest(
|
|
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
|
|
@@ -592,7 +592,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
|
|
final int numberofReads = 10;
|
|
final int numberofReads = 10;
|
|
int readReqId = lastCXid;
|
|
int readReqId = lastCXid;
|
|
processor.stoppedMainLoop = true;
|
|
processor.stoppedMainLoop = true;
|
|
- HashSet<Request> localRequests = new HashSet<Request>();
|
|
|
|
|
|
+ HashSet<Request> localRequests = new HashSet<>();
|
|
|
|
|
|
// queue the blocking write request to queuedRequests
|
|
// queue the blocking write request to queuedRequests
|
|
Request orphanCommittedReq = newRequest(
|
|
Request orphanCommittedReq = newRequest(
|