|
@@ -0,0 +1,271 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.ozone.om.ratis;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Strings;
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.net.ServerSocket;
|
|
|
+import java.net.SocketAddress;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.conf.StorageUnit;
|
|
|
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
|
|
+import org.apache.hadoop.ozone.om.OMConfigKeys;
|
|
|
+import org.apache.ratis.RaftConfigKeys;
|
|
|
+import org.apache.ratis.client.RaftClientConfigKeys;
|
|
|
+import org.apache.ratis.conf.RaftProperties;
|
|
|
+import org.apache.ratis.grpc.GrpcConfigKeys;
|
|
|
+import org.apache.ratis.netty.NettyConfigKeys;
|
|
|
+import org.apache.ratis.protocol.RaftGroupId;
|
|
|
+import org.apache.ratis.protocol.RaftPeerId;
|
|
|
+import org.apache.ratis.rpc.RpcType;
|
|
|
+import org.apache.ratis.rpc.SupportedRpcType;
|
|
|
+import org.apache.ratis.server.RaftServer;
|
|
|
+import org.apache.ratis.server.RaftServerConfigKeys;
|
|
|
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
|
|
|
+import org.apache.ratis.util.LifeCycle;
|
|
|
+import org.apache.ratis.util.SizeInBytes;
|
|
|
+import org.apache.ratis.util.TimeDuration;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Creates a Ratis server endpoint for OM.
|
|
|
+ */
|
|
|
+public final class OzoneManagerRatisServer {
|
|
|
+ private static final Logger LOG = LoggerFactory
|
|
|
+ .getLogger(OzoneManagerRatisServer.class);
|
|
|
+
|
|
|
+ private final int port;
|
|
|
+ private final RaftServer server;
|
|
|
+
|
|
|
+ private OzoneManagerRatisServer(String omId, int port, Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ Objects.requireNonNull(omId, "omId == null");
|
|
|
+ this.port = port;
|
|
|
+ RaftProperties serverProperties = newRaftProperties(conf);
|
|
|
+
|
|
|
+ this.server = RaftServer.newBuilder()
|
|
|
+ .setServerId(RaftPeerId.valueOf(omId))
|
|
|
+ .setProperties(serverProperties)
|
|
|
+ .setStateMachineRegistry(this::getStateMachine)
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static OzoneManagerRatisServer newOMRatisServer(String omId,
|
|
|
+ Configuration ozoneConf) throws IOException {
|
|
|
+ int localPort = ozoneConf.getInt(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
|
|
|
+
|
|
|
+ // Get an available port on current node and
|
|
|
+ // use that as the container port
|
|
|
+ if (ozoneConf.getBoolean(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY_DEFAULT)) {
|
|
|
+ try (ServerSocket socket = new ServerSocket()) {
|
|
|
+ socket.setReuseAddress(true);
|
|
|
+ SocketAddress address = new InetSocketAddress(0);
|
|
|
+ socket.bind(address);
|
|
|
+ localPort = socket.getLocalPort();
|
|
|
+ LOG.info("Found a free port for the OM Ratis server : {}", localPort);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Unable find a random free port for the server, "
|
|
|
+ + "fallback to use default port {}", localPort, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return new OzoneManagerRatisServer(omId, localPort, ozoneConf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return a dummy StateMachine.
|
|
|
+ * TODO: Implement a state machine on OM.
|
|
|
+ */
|
|
|
+ private BaseStateMachine getStateMachine(RaftGroupId gid) {
|
|
|
+ return new BaseStateMachine();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void start() throws IOException {
|
|
|
+ LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
|
|
|
+ server.getId(), port);
|
|
|
+ server.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop() {
|
|
|
+ try {
|
|
|
+ server.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private RaftProperties newRaftProperties(Configuration conf) {
|
|
|
+ final RaftProperties properties = new RaftProperties();
|
|
|
+
|
|
|
+ // Set RPC type
|
|
|
+ final String rpcType = conf.get(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
|
|
|
+ final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
|
|
|
+ RaftConfigKeys.Rpc.setType(properties, rpc);
|
|
|
+
|
|
|
+ // Set the ratis port number
|
|
|
+ if (rpc == SupportedRpcType.GRPC) {
|
|
|
+ GrpcConfigKeys.Server.setPort(properties, port);
|
|
|
+ } else if (rpc == SupportedRpcType.NETTY) {
|
|
|
+ NettyConfigKeys.Server.setPort(properties, port);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set Ratis storage directory
|
|
|
+ String storageDir = getOMRatisDirectory(conf);
|
|
|
+ RaftServerConfigKeys.setStorageDirs(properties,
|
|
|
+ Collections.singletonList(new File(storageDir)));
|
|
|
+
|
|
|
+ // Set RAFT segment size
|
|
|
+ final int raftSegmentSize = (int) conf.getStorageSize(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_DEFAULT,
|
|
|
+ StorageUnit.BYTES);
|
|
|
+ RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
|
|
|
+ SizeInBytes.valueOf(raftSegmentSize));
|
|
|
+
|
|
|
+ // Set RAFT segment pre-allocated size
|
|
|
+ final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
|
|
|
+ StorageUnit.BYTES);
|
|
|
+ RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
|
|
|
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
|
|
|
+ RaftServerConfigKeys.Log.setPreallocatedSize(properties,
|
|
|
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
|
|
|
+
|
|
|
+ // For grpc set the maximum message size
|
|
|
+ // TODO: calculate the max message size based on the max size of a
|
|
|
+ // PutSmallFileRequest's file size limit
|
|
|
+ GrpcConfigKeys.setMessageSizeMax(properties,
|
|
|
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
|
|
|
+
|
|
|
+ // Set the server request timeout
|
|
|
+ TimeUnit serverRequestTimeoutUnit =
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
|
|
|
+ long serverRequestTimeoutDuration = conf.getTimeDuration(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
|
|
|
+ .getDuration(), serverRequestTimeoutUnit);
|
|
|
+ final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
|
|
|
+ serverRequestTimeoutDuration, serverRequestTimeoutUnit);
|
|
|
+ RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
|
|
|
+ serverRequestTimeout);
|
|
|
+
|
|
|
+ // Set timeout for server retry cache entry
|
|
|
+ TimeUnit retryCacheTimeoutUnit = OMConfigKeys
|
|
|
+ .OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
|
|
|
+ long retryCacheTimeoutDuration = conf.getTimeDuration(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
|
|
|
+ .getDuration(), retryCacheTimeoutUnit);
|
|
|
+ final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
|
|
|
+ retryCacheTimeoutDuration, retryCacheTimeoutUnit);
|
|
|
+ RaftServerConfigKeys.RetryCache
|
|
|
+ .setExpiryTime(properties, retryCacheTimeout);
|
|
|
+
|
|
|
+ // Set the server min and max timeout
|
|
|
+ TimeUnit serverMinTimeoutUnit =
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
|
|
|
+ long serverMinTimeoutDuration = conf.getTimeDuration(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
|
|
|
+ .getDuration(), serverMinTimeoutUnit);
|
|
|
+ final TimeDuration serverMinTimeout = TimeDuration.valueOf(
|
|
|
+ serverMinTimeoutDuration, serverMinTimeoutUnit);
|
|
|
+ long serverMaxTimeoutDuration =
|
|
|
+ serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
|
|
|
+ final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
|
|
|
+ serverMaxTimeoutDuration, serverMinTimeoutUnit);
|
|
|
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
|
|
|
+ serverMinTimeout);
|
|
|
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
|
|
|
+ serverMaxTimeout);
|
|
|
+
|
|
|
+ // Enable batch append on raft server
|
|
|
+ RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
|
|
|
+
|
|
|
+ // Set the number of maximum cached segments
|
|
|
+ RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
|
|
|
+
|
|
|
+ // Set the client request timeout
|
|
|
+ TimeUnit clientRequestTimeoutUnit =
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT.getUnit();
|
|
|
+ long clientRequestTimeoutDuration = conf.getTimeDuration(
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_KEY,
|
|
|
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT
|
|
|
+ .getDuration(), clientRequestTimeoutUnit);
|
|
|
+ final TimeDuration clientRequestTimeout = TimeDuration.valueOf(
|
|
|
+ clientRequestTimeoutDuration, clientRequestTimeoutUnit);
|
|
|
+ RaftClientConfigKeys.Rpc.setRequestTimeout(properties,
|
|
|
+ clientRequestTimeout);
|
|
|
+
|
|
|
+ // TODO: set max write buffer size
|
|
|
+
|
|
|
+ /**
|
|
|
+ * TODO: when state machine is implemented, enable StateMachineData sync
|
|
|
+ * and set sync timeout and number of sync retries.
|
|
|
+ */
|
|
|
+
|
|
|
+ /**
|
|
|
+ * TODO: set following ratis leader election related configs when
|
|
|
+ * replicated ratis server is implemented.
|
|
|
+ * 1. leader election timeout
|
|
|
+ * 2. node failure timeout
|
|
|
+ * 3.
|
|
|
+ */
|
|
|
+
|
|
|
+ /**
|
|
|
+ * TODO: when ratis snapshots are implemented, set snapshot threshold and
|
|
|
+ * queue size.
|
|
|
+ */
|
|
|
+
|
|
|
+ return properties;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getServerPort() {
|
|
|
+ return port;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public LifeCycle.State getServerState() {
|
|
|
+ return server.getLifeCycleState();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String getOMRatisDirectory(Configuration conf) {
|
|
|
+ String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);
|
|
|
+
|
|
|
+ if (Strings.isNullOrEmpty(storageDir)) {
|
|
|
+ storageDir = HddsServerUtil.getDefaultRatisDirectory(conf);
|
|
|
+ }
|
|
|
+ return storageDir;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|