|
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.transport.server
|
|
.XceiverServerSpi;
|
|
.XceiverServerSpi;
|
|
|
|
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
|
|
+import org.apache.hadoop.scm.ScmConfigKeys;
|
|
import org.apache.ratis.RaftConfigKeys;
|
|
import org.apache.ratis.RaftConfigKeys;
|
|
import org.apache.ratis.conf.RaftProperties;
|
|
import org.apache.ratis.conf.RaftProperties;
|
|
import org.apache.ratis.grpc.GrpcConfigKeys;
|
|
import org.apache.ratis.grpc.GrpcConfigKeys;
|
|
@@ -37,6 +38,7 @@ import org.apache.ratis.rpc.RpcType;
|
|
import org.apache.ratis.rpc.SupportedRpcType;
|
|
import org.apache.ratis.rpc.SupportedRpcType;
|
|
import org.apache.ratis.server.RaftServer;
|
|
import org.apache.ratis.server.RaftServer;
|
|
import org.apache.ratis.server.RaftServerConfigKeys;
|
|
import org.apache.ratis.server.RaftServerConfigKeys;
|
|
|
|
+import org.apache.ratis.util.SizeInBytes;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@@ -57,23 +59,43 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
|
private final RaftServer server;
|
|
private final RaftServer server;
|
|
|
|
|
|
private XceiverServerRatis(DatanodeID id, int port, String storageDir,
|
|
private XceiverServerRatis(DatanodeID id, int port, String storageDir,
|
|
- ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
|
|
|
|
|
|
+ ContainerDispatcher dispatcher, RpcType rpcType, int maxChunkSize,
|
|
|
|
+ int raftSegmentSize) throws IOException {
|
|
Objects.requireNonNull(id, "id == null");
|
|
Objects.requireNonNull(id, "id == null");
|
|
this.port = port;
|
|
this.port = port;
|
|
|
|
+ RaftProperties serverProperties = newRaftProperties(rpcType, port,
|
|
|
|
+ storageDir, maxChunkSize, raftSegmentSize);
|
|
|
|
|
|
this.server = RaftServer.newBuilder()
|
|
this.server = RaftServer.newBuilder()
|
|
.setServerId(RatisHelper.toRaftPeerId(id))
|
|
.setServerId(RatisHelper.toRaftPeerId(id))
|
|
.setGroup(RatisHelper.emptyRaftGroup())
|
|
.setGroup(RatisHelper.emptyRaftGroup())
|
|
- .setProperties(newRaftProperties(rpcType, port, storageDir))
|
|
|
|
|
|
+ .setProperties(serverProperties)
|
|
.setStateMachine(new ContainerStateMachine(dispatcher))
|
|
.setStateMachine(new ContainerStateMachine(dispatcher))
|
|
.build();
|
|
.build();
|
|
}
|
|
}
|
|
|
|
|
|
- static RaftProperties newRaftProperties(
|
|
|
|
- RpcType rpc, int port, String storageDir) {
|
|
|
|
|
|
+ private static RaftProperties newRaftProperties(
|
|
|
|
+ RpcType rpc, int port, String storageDir, int scmChunkSize,
|
|
|
|
+ int raftSegmentSize) {
|
|
final RaftProperties properties = new RaftProperties();
|
|
final RaftProperties properties = new RaftProperties();
|
|
|
|
+ RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
|
|
|
|
+ RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
|
|
|
|
+ SizeInBytes.valueOf(raftSegmentSize));
|
|
|
|
+ RaftServerConfigKeys.Log.setWriteBufferSize(properties,
|
|
|
|
+ SizeInBytes.valueOf(scmChunkSize));
|
|
|
|
+ RaftServerConfigKeys.Log.setPreallocatedSize(properties,
|
|
|
|
+ SizeInBytes.valueOf(raftSegmentSize));
|
|
|
|
+ RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
|
|
|
|
+ SizeInBytes.valueOf(raftSegmentSize));
|
|
RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
|
|
RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
|
|
RaftConfigKeys.Rpc.setType(properties, rpc);
|
|
RaftConfigKeys.Rpc.setType(properties, rpc);
|
|
|
|
+
|
|
|
|
+ //TODO: change these configs to setter after RATIS-154
|
|
|
|
+ properties.setInt("raft.server.log.segment.cache.num.max", 2);
|
|
|
|
+ properties.setInt("raft.grpc.message.size.max",
|
|
|
|
+ scmChunkSize + raftSegmentSize);
|
|
|
|
+ properties.setInt("raft.server.rpc.timeout.min", 500);
|
|
|
|
+ properties.setInt("raft.server.rpc.timeout.max", 600);
|
|
if (rpc == SupportedRpcType.GRPC) {
|
|
if (rpc == SupportedRpcType.GRPC) {
|
|
GrpcConfigKeys.Server.setPort(properties, port);
|
|
GrpcConfigKeys.Server.setPort(properties, port);
|
|
} else {
|
|
} else {
|
|
@@ -108,6 +130,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
|
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
|
|
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
|
|
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
|
|
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
|
|
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
|
|
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
|
|
|
|
+ final int raftSegmentSize =
|
|
|
|
+ ozoneConf.getInt(ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_KEY,
|
|
|
|
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT);
|
|
|
|
+ final int maxChunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
|
|
|
|
|
|
// Get an available port on current node and
|
|
// Get an available port on current node and
|
|
// use that as the container port
|
|
// use that as the container port
|
|
@@ -133,7 +159,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
|
|
}
|
|
}
|
|
datanodeID.setRatisPort(localPort);
|
|
datanodeID.setRatisPort(localPort);
|
|
return new XceiverServerRatis(datanodeID, localPort, storageDir,
|
|
return new XceiverServerRatis(datanodeID, localPort, storageDir,
|
|
- dispatcher, rpc);
|
|
|
|
|
|
+ dispatcher, rpc, maxChunkSize, raftSegmentSize);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|