|
@@ -175,8 +175,19 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
|
|
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
|
|
|
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
|
|
|
StorageUnit.BYTES);
|
|
|
- RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
|
|
|
- SizeInBytes.valueOf(raftSegmentPreallocatedSize));
|
|
|
+ int logAppenderQueueNumElements = conf.getInt(
|
|
|
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
|
|
|
+ OzoneConfigKeys
|
|
|
+ .DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
|
|
|
+ final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
|
|
|
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
|
|
|
+ OzoneConfigKeys
|
|
|
+ .DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
|
|
|
+ StorageUnit.BYTES);
|
|
|
+ RaftServerConfigKeys.Log.Appender
|
|
|
+ .setBufferElementLimit(properties, logAppenderQueueNumElements);
|
|
|
+ RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
|
|
|
+ SizeInBytes.valueOf(logAppenderQueueByteLimit));
|
|
|
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
|
|
|
SizeInBytes.valueOf(raftSegmentPreallocatedSize));
|
|
|
|
|
@@ -255,8 +266,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
|
|
leaderElectionMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
|
|
|
RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
|
|
|
TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
|
|
|
- // Enable batch append on raft server
|
|
|
- RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
|
|
|
|
|
|
// Set the maximum cache segments
|
|
|
RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
|
|
@@ -299,10 +308,15 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
|
|
setAutoTriggerEnabled(properties, true);
|
|
|
RaftServerConfigKeys.Snapshot.
|
|
|
setAutoTriggerThreshold(properties, snapshotThreshold);
|
|
|
- int logQueueSize =
|
|
|
- conf.getInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE,
|
|
|
- OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_SIZE_DEFAULT);
|
|
|
- RaftServerConfigKeys.Log.setQueueSize(properties, logQueueSize);
|
|
|
+ int logQueueNumElements =
|
|
|
+ conf.getInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS,
|
|
|
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS_DEFAULT);
|
|
|
+ final int logQueueByteLimit = (int) conf.getStorageSize(
|
|
|
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT,
|
|
|
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT,
|
|
|
+ StorageUnit.BYTES);
|
|
|
+ RaftServerConfigKeys.Log.setElementLimit(properties, logQueueNumElements);
|
|
|
+ RaftServerConfigKeys.Log.setByteLimit(properties, logQueueByteLimit);
|
|
|
|
|
|
int numSyncRetries = conf.getInt(
|
|
|
OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
|
|
@@ -409,7 +423,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
|
|
RaftClientReply reply;
|
|
|
RaftClientRequest raftClientRequest =
|
|
|
createRaftClientRequest(request, pipelineID,
|
|
|
- RaftClientRequest.writeRequestType(replicationLevel));
|
|
|
+ RaftClientRequest.writeRequestType());
|
|
|
try {
|
|
|
reply = server.submitClientRequestAsync(raftClientRequest).get();
|
|
|
} catch (Exception e) {
|